TTK
Loading...
Searching...
No Matches
WebSocketIO.cpp
Go to the documentation of this file.
1#include <WebSocketIO.h>
2
3#include <exception>
4
6 this->setDebugMsgPrefix("WebSocketIO");
7
8#ifdef TTK_ENABLE_WEBSOCKETPP
9 this->server.set_error_channels(websocketpp::log::elevel::none);
10 this->server.set_access_channels(websocketpp::log::alevel::none);
11
12 this->server.set_reuse_addr(true);
13
14 // Initialize Asio
15 this->server.init_asio();
16
17 // Set the default message handler to the echo handler
18 this->server.set_message_handler(bind(&WebSocketIO::on_message, this,
19 websocketpp::lib::placeholders::_1,
20 websocketpp::lib::placeholders::_2));
21 this->server.set_open_handler(
22 bind(&WebSocketIO::on_open, this, websocketpp::lib::placeholders::_1));
23 this->server.set_close_handler(
24 bind(&WebSocketIO::on_close, this, websocketpp::lib::placeholders::_1));
25
26#else
27 this->printErr("WebSocketIO requires websocketpp header only library!");
28#endif
29}
30
32 try {
33 this->stopServer();
34 } catch(const std::exception &e) {
35 this->printErr(e.what());
36 }
37}
38
39#ifdef TTK_ENABLE_WEBSOCKETPP
40
42 return this->server.is_listening();
43}
44
46 return this->portNumber;
47}
48
49int ttk::WebSocketIO::startServer(int PortNumber) {
50 ttk::Timer t;
51
52 this->portNumber = PortNumber;
53
54 this->printMsg("Starting Server at Port: " + std::to_string(this->portNumber),
56
57 this->server.reset();
58 this->server.listen(this->portNumber);
59
60 // Queues a connection accept operation
61 this->server.start_accept();
62
63 // Start the Asio io_service run loop
64 this->serverThread = thread([this]() {
65 try {
66 {
67 std::lock_guard<std::mutex> guard(this->mutex);
68 this->serverThreadRunning = true;
69 }
70 this->server.run();
71 {
72 std::lock_guard<std::mutex> guard(this->mutex);
73 this->serverThreadRunning = false;
74 }
75 } catch(websocketpp::exception const &e) {
76 this->printErr("Unable to start server: " + std::string(e.what()));
77 }
78 });
79 this->serverThread.detach();
80
81 this->printMsg("Starting Server at Port: " + std::to_string(this->portNumber),
82 1, t.getElapsedTime());
83
84 return 1;
85}
86
88 ttk::Timer t;
89
90 this->printMsg("Stopping Server", 1, 0, ttk::debug::LineMode::REPLACE);
91
92 if(this->server.is_listening()) {
93
94 // Stopping the Websocket listener and closing outstanding connections.
95 this->server.stop_listening(this->ec);
96 if(this->ec) {
97 this->printErr(this->ec.message());
98 return 0;
99 }
100
101 // Close all existing websocket connections.
102 {
103 this->printMsg("Closing Connections", 0, 0, ttk::debug::LineMode::REPLACE,
105 // initiate closing
106 {
107 std::lock_guard<std::mutex> guard(this->mutex);
108 for(con_list::iterator it = this->connections.begin();
109 it != this->connections.end(); ++it) {
110 this->server.close(*it, websocketpp::close::status::normal,
111 "Terminating connection ...", this->ec);
112 if(this->ec) {
113 this->printErr(this->ec.message());
114 return 0;
115 }
116 }
117 this->connections.clear();
118 }
119
120 // wait until all closed
121 size_t nC = 1;
122 while(nC > 0) {
123 std::lock_guard<std::mutex> guard(this->mutex);
124 nC = this->connections.size();
125 }
126
127 this->printMsg("Closing Connections", 1, 0, ttk::debug::LineMode::NEW,
129 }
130 // Stop the endpoint.
131 {
132 this->printMsg("Terminating Server Thread", 0, 0,
135
136 this->server.stop();
137 // wait until thread terminated
138 bool con = true;
139 while(con) {
140 std::lock_guard<std::mutex> guard(this->mutex);
141 con = this->serverThreadRunning;
142 }
143
144 this->printMsg("Terminating Server Thread", 1, 0,
146 }
147 }
148 this->printMsg("Stopping Server", 1, t.getElapsedTime());
149 return 1;
150}
151
152int ttk::WebSocketIO::sendString(const std::string &msg) const {
153 if(this->connections.size() > 0) {
154 this->server.send(
155 *this->connections.begin(), msg, websocketpp::frame::opcode::text);
156 return 1;
157 }
158 return 0;
159}
160
161int ttk::WebSocketIO::sendBinary(const size_t &sizeInBytes,
162 const void *data) const {
163 if(this->connections.size() > 0) {
164 this->server.send(*this->connections.begin(), data, sizeInBytes,
165 websocketpp::frame::opcode::binary);
166 return 1;
167 }
168 return 0;
169}
170
171int ttk::WebSocketIO::sendMessage(const Message &msg) const {
172 if(msg.binaryPayload != nullptr)
173 return this->sendBinary(msg.binaryPayloadSize, msg.binaryPayload);
174 else
175 return this->sendString(msg.stringPayload);
176}
177
178int ttk::WebSocketIO::queueMessage(const std::string &msg) {
179 this->messageQueue.emplace_back(msg);
180 return 1;
181}
182int ttk::WebSocketIO::queueMessage(const size_t &sizeInBytes,
183 const void *data) {
184 this->messageQueue.emplace_back(sizeInBytes, data);
185 return 1;
186}
187int ttk::WebSocketIO::queueMessage(const Message &msg) {
188 this->messageQueue.emplace_back(msg);
189 return 1;
190}
192 this->messageQueue = std::list<Message>();
193 return 1;
194}
195
197 const size_t nRemainingMessages = this->messageQueue.size();
198 if(nRemainingMessages < 1) {
199 this->printWrn("Empty message queue.");
200 return 0;
201 }
202
203 if(nRemainingMessages > 1) {
204 float progress = ((float)(this->nMessages - nRemainingMessages))
205 / ((float)(this->nMessages - 1));
206 this->printMsg(
207 "Sending " + std::to_string(this->nMessages) + " queued messages",
208 progress, this->msgTimer.getElapsedTime(), ttk::debug::LineMode::REPLACE);
209 } else {
210 this->printMsg(
211 "Sending " + std::to_string(this->nMessages) + " queued messages", 1,
212 this->msgTimer.getElapsedTime());
213 }
214
215 auto msg = this->messageQueue.front();
216 this->messageQueue.pop_front();
217 return this->sendMessage(msg);
218}
219
221 this->nMessages = this->messageQueue.size() + 2;
222 msgTimer.reStart();
223 this->printMsg(
224 "Sending " + std::to_string(this->nMessages) + " queued messages", 0, 0,
226 if(this->nMessages < 1) {
227 this->printWrn("Empty message list.");
228 return 0;
229 }
230
231 this->messageQueue.emplace_front(
232 ttk::WebSocketIO::Message("ttk_WSIO_BeginMessageSequence"));
233 this->messageQueue.emplace_back(
234 ttk::WebSocketIO::Message("ttk_WSIO_EndMessageSequence"));
235
236 return this->sendNextQueuedMessage();
237}
238
239int ttk::WebSocketIO::processEvent(const std::string &eventName,
240 const std::string &eventData) {
241 this->printMsg("processEventBase:" + eventName + " -> " + eventData,
243
244 if(eventName.compare("on_message") == 0) {
245 if(eventData.compare("ttk_WSIO_RequestNextMessage") == 0)
246 return this->sendNextQueuedMessage();
247 }
248
249 return 1;
250}
251
252int ttk::WebSocketIO::on_open(const websocketpp::connection_hdl &hdl) {
253 std::lock_guard<std::mutex> lock(this->mutex);
254
255 ttk::Timer t;
256 this->printMsg(
257 "Establishing Connection", 0, 0, ttk::debug::LineMode::REPLACE);
258
259 if(!this->connections.empty()) {
260 this->printErr("One client is already connected.");
261 this->server.close(hdl, websocketpp::close::status::normal,
262 "Terminating connection ...", ec);
263 return 0;
264 }
265
266 this->connections.insert(hdl);
267 this->printMsg("Establishing Connection", 1, t.getElapsedTime());
268
269 this->processEvent("on_open");
270
271 return 1;
272}
273
274int ttk::WebSocketIO::on_close(const websocketpp::connection_hdl &hdl) {
275 std::lock_guard<std::mutex> lock(this->mutex);
276
277 ttk::Timer t;
278 this->printMsg("Closing Connection", 0, 0, ttk::debug::LineMode::REPLACE);
279 this->connections.erase(hdl);
280 this->printMsg("Closing Connection", 1, t.getElapsedTime());
281
282 return 1;
283}
284
285int ttk::WebSocketIO::on_message(
286 const websocketpp::connection_hdl &ttkNotUsed(hdl),
287 const WSServer::message_ptr &msg) {
288
289 const auto &eventData = msg->get_payload();
290 if(eventData.rfind("ttk_WSIO_", 9) != 0)
291 this->printMsg("Custom Message Received", 1, 0);
292 this->processEvent("on_message", eventData);
293 return 1;
294}
295
296#else
297
299 return 0;
300}
302 return -1;
303}
305 return 0;
306}
308 return 0;
309}
310int ttk::WebSocketIO::sendString(const std::string &ttkNotUsed(msg)) const {
311 return 0;
312}
313int ttk::WebSocketIO::sendBinary(const size_t &ttkNotUsed(sizeInBytes),
314 const void *ttkNotUsed(data)) const {
315 return 0;
316}
318 return 0;
319}
320int ttk::WebSocketIO::queueMessage(const std::string &) {
321 return 0;
322}
323int ttk::WebSocketIO::queueMessage(const size_t &, const void *) {
324 return 0;
325}
327 return 0;
328}
330 return 0;
331}
336 return 0;
337}
338int ttk::WebSocketIO::processEvent(const std::string &ttkNotUsed(eventName),
339 const std::string &ttkNotUsed(eventData)) {
340 return 0;
341}
342
343#endif
#define ttkNotUsed(x)
Mark function/method parameters that are not used in the function body at all.
Definition BaseClass.h:47
void setDebugMsgPrefix(const std::string &prefix)
Definition Debug.h:364
int printErr(const std::string &msg, const debug::LineMode &lineMode=debug::LineMode::NEW, std::ostream &stream=std::cerr) const
Definition Debug.h:149
double getElapsedTime()
Definition Timer.h:15
~WebSocketIO() override
int sendString(const std::string &msg) const
int sendMessage(const Message &msg) const
int queueMessage(const std::string &msg)
int sendBinary(const size_t &sizeInBytes, const void *data) const
virtual int processEvent(const std::string &eventName, const std::string &eventData="")
int startServer(int PortNumber)
printMsg(debug::output::BOLD+" | | | | | . \\ | | (__| | / __/| |_| / __/|__ _|"+debug::output::ENDCOLOR, debug::Priority::PERFORMANCE, debug::LineMode::NEW, stream)