Hello.
I am looking for a fast and reliable protocol for streaming big amount of small text data in real time (text strings might be between 20 and maybe 500 bytes long).
I do not care about order and when they arrive, i care just to get them to my server (100% delivery). I will have like almost 100 endpoints sending to one big server, so it will have heavy traffic.
I don't look for TCP
as it is not connection-less and it might be heavy for the network, has large header.
I was thinking about using UDP
but I saw that I loose some packets, sometimes even 1%+, which is not acceptable. I already did some testing.
I am also implemented KCP
but i don't know if this is the right solution. I am still doing tests.
The implementation has to be compatible with C
and Node.JS
as i will run my app in different environments.
What do you recommend?
Thank you.
libhv udp server on threads
Hello. I am trying to build a simple UDP multi-threaded echo server in C
using libhv
(library is using non-blocking events).
Basically i am trying to combine https://github.com/ithewei/libhv/blob/master/examples/udp_echo_server.c with the threading of https://github.com/ithewei/libhv/blob/master/examples/multi-thread/one-acceptor-multi-workers.c
I do this because i will have high amount of traffic and multiple threads will be needed to handle that kind of stream.
SOURCE CODE:
#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"
static const char* host = "127.0.0.1";
static int port = 8000;
static int thread_num = 4;
static hloop_t* accept_loop = NULL;
static hloop_t** worker_loops = NULL;
static hloop_t* get_next_loop() {
static int s_cur_index = 0;
if (++s_cur_index >= thread_num) {
s_cur_index = 0;
}
return worker_loops[s_cur_index % thread_num];
}
// not used for UDP
/*
static void on_close(hio_t* io) {
printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
*/
// not used for UDP
/*
static void on_recv(hio_t* io, void* buf, int readbytes) {
// echo
hio_write(io, buf, readbytes);
}
*/
// not used for UDP
/*
static void new_conn_event(hevent_t* ev) {
hloop_t* loop = ev->loop;
hio_t* io = (hio_t*)hevent_userdata(ev);
hio_attach(loop, io);
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("tid=%ld connfd=%d [%s] <= [%s]\n",
(long)hv_gettid(),
(int)hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
hio_read(io);
}
*/
// not used for UDP
/*
static void on_accept(hio_t* io) {
hio_detach(io);
hloop_t* worker_loop = get_next_loop();
hevent_t ev;
memset(&ev, 0, sizeof(ev));
ev.loop = worker_loop;
ev.cb = new_conn_event;
ev.userdata = io;
hloop_post_event(worker_loop, &ev);
}
*/
static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
pthread_t tid = pthread_self();
printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("[tid=%ld][fd=%d][%s] <=> [%s] #[%lu]\n",
(long)hv_gettid(),
(int)hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr),
(unsigned long)tid);
// get msg
char* str = (char*)buf;
printf("< %.*s", readbytes, str);
// echo back
printf("> %.*s", readbytes, str);
hio_write(io, buf, readbytes);
#if TEST_KCP
if (strncmp(str, "CLOSE", 5) == 0) {
hio_close_rudp(io, hio_peeraddr(io));
}
#endif
}
static HTHREAD_ROUTINE(worker_thread) {
hloop_t* loop = (hloop_t*)userdata;
hloop_run(loop);
return 0;
}
static HTHREAD_ROUTINE(accept_thread) {
// not used for UDP
/*
hloop_t* loop = (hloop_t*)userdata;
hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
if (listenio == NULL) {
exit(1);
}
hloop_run(loop);
*/
//hloop_t* loop = hloop_new(0);
hloop_t* loop = (hloop_t*)userdata;
hio_t* io = hloop_create_udp_server(loop, host, port); // Change the port as needed
if (io == NULL) {
return NULL;
}
hio_setcb_read(io, on_recvfrom);
hio_read(io);
hloop_run(loop);
hloop_free(&loop);
return 0;
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
port = atoi(argv[1]);
worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
for (int i = 0; i < thread_num; ++i) {
worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
hthread_create(worker_thread, worker_loops[i]);
}
accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
accept_thread(accept_loop);
return 0;
}
If i do a simple test, it is oppening 4 threads but also observe that it only uses one thread:
netcat test1:
nc 127.0.0.1 8000 -u
msg1
msg1
msg2
msg2
^C
netcat test2:
nc 127.0.0.1 8000 -u
msg1 connect2
msg1 connect2
msg2 connect2
msg2 connect2
^C
Output:
on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg1
> msg1
on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg2
> msg2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg1 connect2
> msg1 connect2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg2 connect2
> msg2 connect2
As you can see tid=27641
remains the same even after reconnection.
What should i do to solve this?
Thank you.
Threading pool program with fixed thread_max
I am trying to make a simple HTTP/HTTPS service discovery in C.
For this app i use pthread
as there is no other better cross platform options for this, but i get some strange errors.
PROBLEM1 : thread_submit has do be dynamically modified at the run time. Right now it is only accepting lvalue
and it can only be modified from source code only.
int thread_max = 50; // how many checking threads to spawn [suggesting number between 4 and 8]
#define thread_submit 50 // how many thread submits, usually has to be lower or equal to thread_max
typedef struct Task {
int id;
char* ip;
int port;
} Task;
Task taskQueue[thread_submit];
int taskCount = 0;
After a while (first for iteration) the program stops.
PROBLEM2 : after first iteration, program waits
int main()
{
for(int this_port = 80; this_port <= 88; this_port++)
{
...
pthread_t thread_scan;
pthread_t thread_read;
pthread_create(&thread_scan, &attr, &run_fake_scan, NULL);
pthread_create(&thread_read, &attr, &read_results, NULL);
pthread_join(thread_scan, NULL);
pthread_join(thread_read, NULL);
pthread_attr_destroy(&attr);
...
}
}
Output:
read_results -> Thread created no. 44
read_results -> Thread created no. 45
startThread -> i am here 1
read_results -> Thread created no. 46
startThread -> i am here 1
startThread -> i am here 1
read_results -> Thread created no. 47
read_results -> Thread created no. 48
read_results -> Thread created no. 49
startThread -> i am here 1
startThread -> i am here 1
It is stopping/waiting on pthread_mutex_lock(&mutexQueue);
line, as if is infinitely waiting for mutex to get unlocked and never does that.
Full source code : https://pastebin.com/a6rtTBcX
Run code : https://www.onlinegdb.com/edit/EAou_Yzol
What am i doing wrong?
Thank you.
websockets and pymongo exception
Hello everyone. I am building a websocket client app that receives all the data and inserts it into a MongoDB database. The program runs, but after a while, some exceptions are rising.
import websocket
import _thread
import time
import json
import asyncio
import os
import pwd
import socket
import traceback as tb
import pymongo
from datetime import datetime
import threading
import ssl
httpthreat = mongo['httpthreat']
collection1 = httpthreat['collection1']
websocket_link = "wss://somewebservice.com/service"
def get_data(HOST,PORT,TIMEOUT=5,RETRY=3):
for itry in range(1,RETRY+1):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(TIMEOUT)
sock.connect((HOST, PORT))
# processing some data
# processing some data
# processing some data
return str(data,'utf-8')
except Exception as e:
traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
print("get_data exception")
time.sleep(TIMEOUT)
print(traceback_str)
return None
def process_data(IP,PORT):
try:
data = get_data(IP,int(PORT))
collection1.insert_one({"ip":IP,"port":PORT,"data":data,"date":datetime.now()}) # <==== this line throws exceptions
# i even tried this, but no use:
'''
for i in range(5):
try:
collection1.insert_one({"ip":IP,"port":PORT,"data":data,"date":datetime.now()}) # <==== this line throws exceptions anyway
break
except pymongo.errors.AutoReconnect:
print("pymongo.errors.AutoReconnect exception ["+str(i)+"]")
time.sleep(pow(2, i))
'''
except Exception as e:
traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
print("process_data exception")
print(traceback_str)
def on_message(ws, message):
message_json = json.loads(message)
if(message_json['protocol'] == "http"):
try:
threads = [threading.Thread(target=process_data, args=(message_json['ip'], message_json['port']))]
for thread in threads:
thread.start()
#for thread in threads:
#thread.join() # waits for thread to complete its task <==== if i uncomment this, program slows
except Exception as e:
print("on_message exception")
traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
print(traceback_str)
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("### closed ###")
time.sleep(1)
websocket.enableTrace(True)
ws = websocket.WebSocketApp(websocket_link,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.daemon = True
threading.Thread(target=ws.run_forever(ping_interval=70, ping_timeout=10,sslopt={"cert_reqs": ssl.CERT_NONE}))
def on_open(ws):
print("Opened connection")
if __name__ == "__main__":
while True:
try:
print("Starting")
websocket.enableTrace(True)
ws = websocket.WebSocketApp(websocket_link,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.daemon = True
threading.Thread(target=ws.run_forever(ping_interval=70, ping_timeout=10,sslopt={"cert_reqs": ssl.CERT_NONE}))
except Exception as e:
traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
print("main exception")
print(traceback_str)
print("Restarting ...")
time.sleep(1)
I am usining websocket
library and python3
Errors that i am getting :
Error1 (occurance interval 10 minutes):
ping/pong timed out
send: b'\x88\x82c\xa0\xc1\xc0`H'
### closed ###
[Errno 9] Bad file descriptor
### closed ###
--- request header ---
GET /ws/service HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: somewebservice.net
Origin: http://somewebservice.net
Sec-WebSocket-Key: Rz2+MPdkcq0zgQqgQN/c7w==
Sec-WebSocket-Version: 13
-----------------------
--- response header ---
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: xRBxqrDNUMz7eNe1hwCxe11Pkxk=
-----------------------
Opened connection
I don't understand why this occurs at this interval, if i load the socket in browser, this never happen
Error2 (occurance interval about 10 minutes):
send: b'\x88\x82\xe4\xadP\xfd\xe7E'
error from callback <function on_close at 0x7fe2e5b4b9d8>: [Errno 9] Bad file descriptor
File "/usr/lib/python3/dist-packages/websocket/_app.py", line 335, in _callback
callback(self, *args)
File "script.py", line 171, in on_close
threading.Thread(target=ws.run_forever(ping_interval=70, ping_timeout=10,sslopt={"cert_reqs": ssl.CERT_NONE}))
File "/usr/lib/python3/dist-packages/websocket/_app.py", line 302, in run_forever
teardown()
File "/usr/lib/python3/dist-packages/websocket/_app.py", line 226, in teardown
self.sock.close()
File "/usr/lib/python3/dist-packages/websocket/_core.py", line 420, in close
self.shutdown()
File "/usr/lib/python3/dist-packages/websocket/_core.py", line 432, in shutdown
self.sock.close()
File "/usr/lib/python3.7/socket.py", line 420, in close
self._real_close()
File "/usr/lib/python3.7/ssl.py", line 1108, in _real_close
super()._real_close()
File "/usr/lib/python3.7/socket.py", line 414, in _real_close
_ss.close(self)
--- request header ---
GET /ws/service HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: something.net
When this happens, it triggers restarting of the websocket.WebSocketApp
I am also getting a lot of 'pymongo' errors
Error3:
Traceback (most recent call last):
File "script.py", line 100, in process_data
something.insert_one({"ip":IP,"port":PORT,"date":datetime.now()})
File "/usr/local/lib/python3.7/dist-packages/pymongo/collection.py", line 613, in insert_one
comment=comment,
File "/usr/local/lib/python3.7/dist-packages/pymongo/collection.py", line 547, in _insert_one
self.__database.client._retryable_write(acknowledged, _insert_command, session)
File "/usr/local/lib/python3.7/dist-packages/pymongo/mongo_client.py", line 1399, in _retryable_write
return self._retry_with_session(retryable, func, s, None)
File "/usr/local/lib/python3.7/dist-packages/pymongo/mongo_client.py", line 1286, in _retry_with_session
return self._retry_internal(retryable, func, session, bulk)
File "/usr/local/lib/python3.7/dist-packages/pymongo/mongo_client.py", line 1320, in _retry_internal
return func(session, sock_info, retryable)
File "/usr/local/lib/python3.7/dist-packages/pymongo/collection.py", line 542, in _insert_command
retryable_write=retryable_write,
File "/usr/local/lib/python3.7/dist-packages/pymongo/pool.py", line 770, in command
self._raise_connection_failure(error)
File "/usr/local/lib/python3.7/dist-packages/pymongo/pool.py", line 764, in command
exhaust_allowed=exhaust_allowed,
File "/usr/local/lib/python3.7/dist-packages/pymongo/network.py", line 150, in command
reply = receive_message(sock_info, request_id)
File "/usr/local/lib/python3.7/dist-packages/pymongo/network.py", line 213, in receive_message
raise ProtocolError("Got response id %r but expected %r" % (response_to, request_id))
pymongo.errors.ProtocolError: Got response id 1852141647 but expected 402724286
After a while i get a lot of pymongo.errors.AutoReconnect
then program does not write to the database anymore.
I don't understand how come a program can run a while then start to throw exception like crazy, especially when you connect it to a MongoDB database with pymongo. I have read about pymongo and it is not fork safe, but thread safe yes.
If i load the websocket in browser, it is simply working with no problems at all.
Any hits please?
Storing ipv4 and ipv6 in MongoDB
Hello
I am building a database (i prefer MongoDB) that i will store over 100 mil ipv4 and ipv6 records for logging purposes. Data sample :
1.1.1.1 -> 0x1010101
1.2.3.4 -> 0x1020304
34.53.63.25 -> 0x22353f19
255.255.255.255 -> 0xffffffff
0001:0001:0001:0001:0001:0001:0001:0001 -> 0x10001000100010001000100010001
1111:1111:1111:1111:1111:1111:1111:1111 -> 0x11111111111111111111111111111111
2345:0425:2CA1:0000:0000:0567:5673:23b5 -> 0x234504252ca1000000000567567323b5
2345:0425:2CA1::0567:5673:23b5 -> 0x234504252ca1000000000567567323b5
ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff -> 0xffffffffffffffffffffffffffffffff
I will have a lot of queries retrieving data by IP. I don't care about space, queries must be as fast as possible.
I was thinking about storing in binary or have different 4 (for ipv4) and 8 columns (for ipv6) spitted IP parts.
What is the most efficient way in terms of speed to achieve that?
MongoDB ipv4/ipv6 hex index
I am building a ipv4/ipv6 geo ip MongoDB database and i will have millions (100+) of ips
Structure will of the database will be
[
{ _id: 58fdbf5c0ef8a50b4cdd9a8e , ip: '34.53.63.25', ip_hex: '0x22353f19' , type: "ipv4", data : [
{
country : "CA",
region : "region1"
city : "city1",
blacklisted : "no"
on_date : ISODate("2022-05-05T00:00:00Z")
},
{
country : "CA",
region : "region1"
city : "city1",
blacklisted : "yes"
on_date : ISODate("2022-06-05T00:00:00Z")
},
{
country : "US",
region : "region2"
city : "city2",
blacklisted : "no"
on_date : ISODate("2022-05-05T00:00:00Z")
},
...
]},
{ _id: 58fdbf5c0ef8a50b4cdd9a8e , ip: '1.2.3.4', ip_hex: '0x1020304', type: "ipv4", data : [
{
country : "CA",
region : "region1"
city : "city1",
blacklisted : "no"
on_date : ISODate("2022-06-05T00:00:00Z")
},
]},
{ _id: 58fdbf5c0ef8a50b4cdd9a8e , ip: '2345:0425:2CA1:0000:0000:0567:5673:23b5', ip_hex: '0x234504252ca1000000000567567323b5', type: "ipv6", data : [
{
country : "FR",
region : "region1"
city : "city1",
blacklisted : "no"
on_date : ISODate("2022-06-05T00:00:00Z")
},
{
country : "FR",
region : "region1"
city : "city1",
blacklisted : "yes"
on_date : ISODate("2022-07-05T00:00:00Z")
},
...
]},
]
I am converting all IP string data to HEX :
1.1.1.1 -> 0x1010101
1.2.3.4 -> 0x1020304
34.53.63.25 -> 0x22353f19
255.255.255.255 -> 0xffffffff
0001:0001:0001:0001:0001:0001:0001:0001 -> 0x10001000100010001000100010001
1111:1111:1111:1111:1111:1111:1111:1111 -> 0x11111111111111111111111111111111
2345:0425:2CA1:0000:0000:0567:5673:23b5 -> 0x234504252ca1000000000567567323b5
2345:0425:2CA1::0567:5673:23b5 -> 0x234504252ca1000000000567567323b5
ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff -> 0xffffffffffffffffffffffffffffffff
There will be a lot of searches by IP and it will be added/deleted/updated new data to every IP each day.
I will search ranges of ips, sort, update, delete.
What index is recommended on "ip_hex" column? I was thinking about a B-tree search on HEX not STR.
I want to have an efficient database. What other optimizations should i take into consideration?
Thank you.
Best way to manage a big database
The web application will have a database that consists of millions (over 20mil ... maybe 100mil or more) of hosts (ipv4,ipv6 and domains) and additional data to each host like blacklist, uptime, history, geo data (country, region, city, isp). I could, of course, use other RMDBS system like MySQL, Oracle, PostgreSQL ... but i want speed and I will have a lot of data. I stumbled upon at NoSQL and MongoDB and want to know what are the best way to develop and manage a big database.
I was thinking about using b-tree search on hosts but i will have to sort them and have 4 columns for ipv4 and 8 columns on ipv6. On domains i don't know how i would manage to do that ... i might just use full text search, maybe just a sort will help me.
Database will be constantly updated and used so a clever lock per row must be enabled. Main searches will be by host, geo code, isp, tags and hostId.
hosts table
hostId , host (must be either text or multiple columns) , tags (array) , blacklists (array) , uptime , geoCountry , geoRegion , geoCity, geoIsp , dateInserted
There is also a possibility that database might have a lot of offline hosts and multiple records per host so another history table must be created and whenever a host goes offline or a change occures (blacklist,geo ip data), must go in history.
hosts_history table
hostHistoryId , host (must be either text or multiple columns), tags (array) , blacklists (array) , uptime , geoCountry , geoRegion , geoCity, geoIsp , dateInserted
What would be the best approach and how would can I optimize this project for the best speed as possible?
fstream pointer in vector of struct
Hello.
I am writing a program in C++ that is reading some files with fstream
then writes with simple function and after that closes them.
I created a struct
and i keep there filetag,filepointer and lines. The problem is that from main i can't write or close the file ... but i can acess filetag and lines variables. Here is my code:
struct Fileoutput
{
string filetag;
std::fstream* filepointer;
int lines = 0;
};
static std::vector<Fileoutput> vect_fileoutputs;
static void open_files()
{
// open all
struct Fileoutput fo_all;
fo_all.filetag = "all";
fo_all.lines = 0;
std::fstream fs_all;
fs_all.open("all.log",std::fstream::out | std::fstream::app);
fo_all.filepointer = &fs_all;
//*fo_all.filepointer << "something from struct" << endl; // here it is working, in other scope ... no
vect_fileoutputs.push_back(fo_all);
/*
for(const struct Fileoutput fo : vect_fileoutputs)
{
*fo.filepointer << "something from vect" << endl; // here is working
}
*/
// open error
struct Fileoutput fo_error;
fo_error.filetag = "error";
fo_error.lines = 0;
std::fstream fs_error;
fs_error.open("error.log",std::fstream::out | std::fstream::app);
fo_error.filepointer = &fs_error;
vect_fileoutputs.push_back(fo_error);
}
static bool write_to_outputfile(char* outputfile,char* content)
{
bool write = false;
for(const struct Fileoutput fo : vect_fileoutputs)
{
if(strcmp(fo.filetag.c_str(),outputfile) == 0)
{
printf("Writing [%s] to [%s]\n",content,outputfile);
std::fstream* fs_write;
fs_write = fo.filepointer;
*fo.filepointer << content; // NOT WORKING ... seg fault
write = true;
break;
}
}
return write;
}
static void close_files()
{
for(const struct Fileoutput file_output: vect_fileoutputs)
{
printf("Closing file [%s]\n",file_output.filetag.c_str());
std::fstream* fs_temp;
fs_temp = file_output.filepointer;
fs_temp->close(); // NOT WORKING ... seg fault
printf("Closed [%s]\n",file_output.filetag.c_str());
}
}
int main(int argc, char* argv[])
{
open_files();
write_to_outputfile((char*)"all",(char*)"something1\n");
write_to_outputfile((char*)"all",(char*)"something2\n");
write_to_outputfile((char*)"all",(char*)"something3\n");
write_to_outputfile((char*)"error",(char*)"error\n");
close_files();
return 1;
}
The gdb error looks like :
gdb error :
Program received signal SIGSEGV, Segmentation fault.
0x0000000000405de9 in ?? ()
(gdb) x 0x0000000000405de9
0x405de9: 0xe8420348
(gdb) x 0x405de9
0x405de9: 0xe8420348
(gdb) x 0xe8420348
0xe8420348: Cannot access memory at address 0xe8420348
(gdb)
What is wrong with the code?
Thank you.