1 ///
2 module mosquittod.wrap;
3 
4 import std.algorithm : map;
5 import std.exception;
6 import std.array : array, Appender;
7 import std.format : formattedWrite;
8 import std..string;
9 
10 import mosquittod.api;
11 import mosquittod.types;
12 
13 ///
14 class MosquittoException : Exception
15 {
16     this(string msg, string file=__FILE__, size_t line=__LINE__)
17     { super(msg, file, line); }
18 }
19 
20 ///
21 class MosquittoCallException : MosquittoException
22 {
23     MOSQ_ERR err;
24     this(MOSQ_ERR err, string func)
25     {
26         this.err = err;
27         super(format("fail '%s': %d (%s)", func, err, err));
28     }
29 }
30 
31 private void mosqCheck(alias fnc, Args...)(Args args)
32 {
33     if (auto r = cast(MOSQ_ERR)fnc(args))
34         throw new MosquittoCallException(r, __traits(identifier, fnc));
35 }
36 
37 ///
38 class MosquittoClient
39 {
40 protected:
41     mosquitto* mosq;
42 
43     static struct Callback
44     {
45         string pattern;
46         int qos;
47         void delegate(const(char)[], const(void)[]) func;
48     }
49 
50     Callback[] slist;
51 
52     bool _connected;
53 
54     Appender!(char[])[] buffers;
55 
56     char* toStringzBuf(string str, size_t n=0)
57     {
58         if (str == "") return null;
59         buffers[n].clear();
60         formattedWrite(buffers[n], "%s\0", str);
61         return buffers[n].data.ptr;
62     }
63 
64     extern(C) static
65     {
66         void onConnectCallback(mosquitto* mosq, void* cptr, int res)
67         {
68             auto cli = enforce!MosquittoException(
69                             cast(MosquittoClient)cptr, "null cli");
70             enum Res
71             {
72                 success = 0,
73                 unacceptable_protocol_version = 1,
74                 identifier_rejected = 2,
75                 broker_unavailable = 3
76             }
77             enforce!MosquittoException(res == 0,
78                         format("connection error: %s", cast(Res)res));
79             cli._connected = true;
80             cli.subscribeList();
81             if (cli.onConnect !is null) cli.onConnect();
82         }
83 
84         void onDisconnectCallback(mosquitto* mosq, void* cptr, int res)
85         {
86             auto cli = enforce!MosquittoException(
87                             cast(MosquittoClient)cptr, "null cli");
88             cli._connected = false;
89         }
90 
91         void onMessageCallback(mosquitto* mosq, void* cptr,
92                                 const mosquitto_message* msg)
93         {
94             auto cli = enforce!MosquittoException(
95                             cast(MosquittoClient)cptr, "null cli");
96             cli.onMessage(msg.topic, msg.payload[0..msg.payloadlen]);
97         }
98     }
99 
100     void subscribeList()
101     {
102         foreach (cb; slist)
103             mosqCheck!mosquitto_subscribe(mosq, null,
104                             toStringzBuf(cb.pattern), cb.qos);
105     }
106 
107     void onMessage(const(char)* topicZ, const(void[]) payload)
108     {
109         foreach (cb; slist)
110         {
111             bool res;
112             auto patt = toStringzBuf(cb.pattern);
113             mosqCheck!mosquitto_topic_matches_sub(patt, topicZ, &res);
114             if (res) cb.func(topicZ.fromStringz(), payload);
115         }
116     }
117 
118 public:
119     ///
120     struct Settings
121     {
122         string host = "127.0.0.1";
123         ushort port = 1883;
124         string clientId;
125         bool cleanSession = true;
126         int keepalive = 5;
127     }
128 
129     ///
130     Settings settings;
131 
132     void delegate() onConnect;
133 
134     ///
135     this(Settings s=Settings.init)
136     {
137         import core.stdc.errno;
138 
139         initMosquittoLib();
140 
141         buffers = new Appender!(char[])[](1); // for now need only 1 buffer
142         foreach (buf; buffers) buf.reserve(1024);
143 
144         settings = s;
145 
146         mosq = enforce!MosquittoException(
147                     mosquitto_new(toStringzBuf(s.clientId),
148                                   s.cleanSession, cast(void*)this),
149                     format("error while create mosquitto: %d", errno));
150 
151         mosquitto_connect_callback_set(mosq, &onConnectCallback);
152         mosquitto_message_callback_set(mosq, &onMessageCallback);
153     }
154 
155     ~this() { if (_connected) disconnect(); }
156 
157     ///
158     bool connected() const @property { return _connected; }
159 
160     ///
161     void loop(int timeoutMSecs=0)
162     { mosqCheck!mosquitto_loop(mosq, timeoutMSecs, 1); }
163 
164     ///
165     void connect()
166     {
167         mosqCheck!mosquitto_connect(mosq, toStringzBuf(settings.host),
168                                     settings.port, settings.keepalive);
169     }
170 
171     ///
172     void reconnect() { mosqCheck!mosquitto_reconnect(mosq); }
173 
174     ///
175     void disconnect() { mosqCheck!mosquitto_disconnect(mosq); }
176 
177     /// publish 
178     int publish(string topic, int qos, const(void)[] data, bool retain=false)
179     {
180         int mid;
181         mosqCheck!mosquitto_publish(mosq, &mid, toStringzBuf(topic),
182                             cast(int)data.length, data.ptr, qos, retain);
183         return mid;
184     }
185 
186     /// ditto with qos=0
187     int publish(string topic, const(void)[] data, bool retain=false)
188     { return publish(topic, 0, data, retain); }
189 
190     /// you need copy message data in callback if requires
191     void subscribe(string pattern, int qos, void delegate(const(char)[],
192                     const(void)[]) cb)
193     {
194         slist ~= Callback(pattern, qos, cb);
195         if (connected) mosqCheck!mosquitto_subscribe(mosq, null,
196                                     toStringzBuf(pattern), qos);
197     }
198 
199     /// ditto
200     void subscribe(string pattern, int qos, void delegate(const(void)[]) cb)
201     {
202         slist ~= Callback(pattern, qos,
203                     (const(char)[], const(void)[] data){ cb(data); });
204         if (connected) mosqCheck!mosquitto_subscribe(mosq, null,
205                                     toStringzBuf(pattern), qos);
206     }
207 }