1 /// 2 module mosquittod.wrap; 3 4 import std.algorithm : map; 5 import std.exception; 6 import std.array : array, Appender; 7 import std.format : formattedWrite; 8 import std..string; 9 10 import mosquittod.api; 11 import mosquittod.types; 12 13 /// 14 class MosquittoException : Exception 15 { 16 this(string msg, string file=__FILE__, size_t line=__LINE__) 17 { super(msg, file, line); } 18 } 19 20 /// 21 class MosquittoCallException : MosquittoException 22 { 23 MOSQ_ERR err; 24 this(MOSQ_ERR err, string func) 25 { 26 this.err = err; 27 super(format("fail '%s': %d (%s)", func, err, err)); 28 } 29 } 30 31 private void mosqCheck(alias fnc, Args...)(Args args) 32 { 33 if (auto r = cast(MOSQ_ERR)fnc(args)) 34 throw new MosquittoCallException(r, __traits(identifier, fnc)); 35 } 36 37 /// 38 class MosquittoClient 39 { 40 protected: 41 mosquitto* mosq; 42 43 static struct Callback 44 { 45 string pattern; 46 int qos; 47 void delegate(const(char)[], const(void)[]) func; 48 } 49 50 Callback[] slist; 51 52 bool _connected; 53 54 Appender!(char[])[] buffers; 55 56 char* toStringzBuf(string str, size_t n=0) 57 { 58 if (str == "") return null; 59 buffers[n].clear(); 60 formattedWrite(buffers[n], "%s\0", str); 61 return buffers[n].data.ptr; 62 } 63 64 extern(C) static 65 { 66 void onConnectCallback(mosquitto* mosq, void* cptr, int res) 67 { 68 auto cli = enforce!MosquittoException( 69 cast(MosquittoClient)cptr, "null cli"); 70 enum Res 71 { 72 success = 0, 73 unacceptable_protocol_version = 1, 74 identifier_rejected = 2, 75 broker_unavailable = 3 76 } 77 enforce!MosquittoException(res == 0, 78 format("connection error: %s", cast(Res)res)); 79 cli._connected = true; 80 cli.subscribeList(); 81 if (cli.onConnect !is null) cli.onConnect(); 82 } 83 84 void onDisconnectCallback(mosquitto* mosq, void* cptr, int res) 85 { 86 auto cli = enforce!MosquittoException( 87 cast(MosquittoClient)cptr, "null cli"); 88 cli._connected = false; 89 } 90 91 void onMessageCallback(mosquitto* mosq, void* cptr, 92 const mosquitto_message* msg) 93 { 94 auto cli = enforce!MosquittoException( 95 cast(MosquittoClient)cptr, "null cli"); 96 cli.onMessage(msg.topic, msg.payload[0..msg.payloadlen]); 97 } 98 } 99 100 void subscribeList() 101 { 102 foreach (cb; slist) 103 mosqCheck!mosquitto_subscribe(mosq, null, 104 toStringzBuf(cb.pattern), cb.qos); 105 } 106 107 void onMessage(const(char)* topicZ, const(void[]) payload) 108 { 109 foreach (cb; slist) 110 { 111 bool res; 112 auto patt = toStringzBuf(cb.pattern); 113 mosqCheck!mosquitto_topic_matches_sub(patt, topicZ, &res); 114 if (res) cb.func(topicZ.fromStringz(), payload); 115 } 116 } 117 118 public: 119 /// 120 struct Settings 121 { 122 string host = "127.0.0.1"; 123 ushort port = 1883; 124 string clientId; 125 bool cleanSession = true; 126 int keepalive = 5; 127 } 128 129 /// 130 Settings settings; 131 132 void delegate() onConnect; 133 134 /// 135 this(Settings s=Settings.init) 136 { 137 import core.stdc.errno; 138 139 initMosquittoLib(); 140 141 buffers = new Appender!(char[])[](1); // for now need only 1 buffer 142 foreach (buf; buffers) buf.reserve(1024); 143 144 settings = s; 145 146 mosq = enforce!MosquittoException( 147 mosquitto_new(toStringzBuf(s.clientId), 148 s.cleanSession, cast(void*)this), 149 format("error while create mosquitto: %d", errno)); 150 151 mosquitto_connect_callback_set(mosq, &onConnectCallback); 152 mosquitto_message_callback_set(mosq, &onMessageCallback); 153 } 154 155 ~this() { if (_connected) disconnect(); } 156 157 /// 158 bool connected() const @property { return _connected; } 159 160 /// 161 void loop(int timeoutMSecs=0) 162 { mosqCheck!mosquitto_loop(mosq, timeoutMSecs, 1); } 163 164 /// 165 void connect() 166 { 167 mosqCheck!mosquitto_connect(mosq, toStringzBuf(settings.host), 168 settings.port, settings.keepalive); 169 } 170 171 /// 172 void reconnect() { mosqCheck!mosquitto_reconnect(mosq); } 173 174 /// 175 void disconnect() { mosqCheck!mosquitto_disconnect(mosq); } 176 177 /// publish 178 int publish(string topic, int qos, const(void)[] data, bool retain=false) 179 { 180 int mid; 181 mosqCheck!mosquitto_publish(mosq, &mid, toStringzBuf(topic), 182 cast(int)data.length, data.ptr, qos, retain); 183 return mid; 184 } 185 186 /// ditto with qos=0 187 int publish(string topic, const(void)[] data, bool retain=false) 188 { return publish(topic, 0, data, retain); } 189 190 /// you need copy message data in callback if requires 191 void subscribe(string pattern, int qos, void delegate(const(char)[], 192 const(void)[]) cb) 193 { 194 slist ~= Callback(pattern, qos, cb); 195 if (connected) mosqCheck!mosquitto_subscribe(mosq, null, 196 toStringzBuf(pattern), qos); 197 } 198 199 /// ditto 200 void subscribe(string pattern, int qos, void delegate(const(void)[]) cb) 201 { 202 slist ~= Callback(pattern, qos, 203 (const(char)[], const(void)[] data){ cb(data); }); 204 if (connected) mosqCheck!mosquitto_subscribe(mosq, null, 205 toStringzBuf(pattern), qos); 206 } 207 }