Fast and reliable transport protocol for streaming text

Hello.
I am looking for a fast and reliable protocol for streaming big amount of small text data in real time (text strings might be between 20 and maybe 500 bytes long).
I do not care about order and when they arrive, i care just to get them to my server (100% delivery). I will have like almost 100 endpoints sending to one big server, so it will have heavy traffic.
I don't look for TCP as it is not connection-less and it might be heavy for the network, has large header.
I was thinking about using UDP but I saw that I loose some packets, sometimes even 1%+, which is not acceptable. I already did some testing.
I am also implemented KCP but i don't know if this is the right solution. I am still doing tests.
The implementation has to be compatible with C and Node.JS as i will run my app in different environments.
What do you recommend?
Thank you.

libhv udp server on threads

Hello. I am trying to build a simple UDP multi-threaded echo server in C using libhv (library is using non-blocking events).

Basically i am trying to combine https://github.com/ithewei/libhv/blob/master/examples/udp_echo_server.c with the threading of https://github.com/ithewei/libhv/blob/master/examples/multi-thread/one-acceptor-multi-workers.c

I do this because i will have high amount of traffic and multiple threads will be needed to handle that kind of stream.

SOURCE CODE:

#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"

static const char* host = "127.0.0.1";
static int port = 8000;
static int thread_num = 4;
static hloop_t*  accept_loop = NULL;
static hloop_t** worker_loops = NULL;

static hloop_t* get_next_loop() {
    static int s_cur_index = 0;
    if (++s_cur_index >= thread_num) {
        s_cur_index = 0;
    }
    return worker_loops[s_cur_index % thread_num];
}

// not used for UDP
/*
static void on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
*/

// not used for UDP
/*
static void on_recv(hio_t* io, void* buf, int readbytes) {
    // echo
    hio_write(io, buf, readbytes);
}
*/

// not used for UDP
/*
static void new_conn_event(hevent_t* ev) {
    hloop_t* loop = ev->loop;
    hio_t* io = (hio_t*)hevent_userdata(ev);
    hio_attach(loop, io);

    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
            (long)hv_gettid(),
            (int)hio_fd(io),
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);

    hio_read(io);
}
*/

// not used for UDP
/*
static void on_accept(hio_t* io) {
    hio_detach(io);

    hloop_t* worker_loop = get_next_loop();
    hevent_t ev;
    memset(&ev, 0, sizeof(ev));
    ev.loop = worker_loop;
    ev.cb = new_conn_event;
    ev.userdata = io;
    hloop_post_event(worker_loop, &ev);
}
*/

static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
    pthread_t tid = pthread_self();

    printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("[tid=%ld][fd=%d][%s] <=> [%s] #[%lu]\n",
        (long)hv_gettid(),
        (int)hio_fd(io),
        SOCKADDR_STR(hio_localaddr(io), localaddrstr),
        SOCKADDR_STR(hio_peeraddr(io), peeraddrstr),
        (unsigned long)tid);

    // get msg
    char* str = (char*)buf;
    printf("< %.*s", readbytes, str);

    // echo back
    printf("> %.*s", readbytes, str);
    hio_write(io, buf, readbytes);

#if TEST_KCP
    if (strncmp(str, "CLOSE", 5) == 0) {
        hio_close_rudp(io, hio_peeraddr(io));
    }
#endif
}


static HTHREAD_ROUTINE(worker_thread) {
    hloop_t* loop = (hloop_t*)userdata;
    hloop_run(loop);
    return 0;
}

static HTHREAD_ROUTINE(accept_thread) {
    // not used for UDP
    /*
    hloop_t* loop = (hloop_t*)userdata;
    hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
    if (listenio == NULL) {
        exit(1);
    }
    hloop_run(loop);
    */

    //hloop_t* loop = hloop_new(0);
    hloop_t* loop = (hloop_t*)userdata;
    hio_t* io = hloop_create_udp_server(loop, host, port); // Change the port as needed
    if (io == NULL) {
        return NULL;
    }

    hio_setcb_read(io, on_recvfrom);
    hio_read(io);
    hloop_run(loop);
    hloop_free(&loop);

    return 0;
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    port = atoi(argv[1]);

    worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
    for (int i = 0; i < thread_num; ++i) {
        worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
        hthread_create(worker_thread, worker_loops[i]);
    }

    accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
    accept_thread(accept_loop);

    return 0;
}

If i do a simple test, it is oppening 4 threads but also observe that it only uses one thread:

netcat test1:

nc 127.0.0.1 8000  -u
msg1
msg1
msg2
msg2
^C

netcat test2:

nc 127.0.0.1 8000  -u
msg1 connect2
msg1 connect2
msg2 connect2
msg2 connect2
^C

Output:

on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg1
> msg1
on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg2
> msg2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg1 connect2
> msg1 connect2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg2 connect2
> msg2 connect2

As you can see tid=27641 remains the same even after reconnection.

What should i do to solve this?

Thank you.

Threading pool program with fixed thread_max

I am trying to make a simple HTTP/HTTPS service discovery in C.

For this app i use pthread as there is no other better cross platform options for this, but i get some strange errors.

PROBLEM1 : thread_submit has do be dynamically modified at the run time. Right now it is only accepting lvalue and it can only be modified from source code only.

int thread_max = 50; // how many checking threads to spawn [suggesting number between 4 and 8]
#define thread_submit 50 // how many thread submits, usually has to be lower or equal to thread_max

typedef struct Task {
    int id;
    char* ip;
    int port;
} Task;

Task taskQueue[thread_submit];
int taskCount = 0;

After a while (first for iteration) the program stops.

PROBLEM2 : after first iteration, program waits

int main()
{
    for(int this_port = 80; this_port <= 88; this_port++)
    {
        ...

        pthread_t thread_scan;
        pthread_t thread_read;
        pthread_create(&thread_scan, &attr, &run_fake_scan, NULL);
        pthread_create(&thread_read, &attr, &read_results, NULL);
        pthread_join(thread_scan, NULL);
        pthread_join(thread_read, NULL);
        pthread_attr_destroy(&attr);

        ...
    }
}

Output:

read_results -> Thread created no. 44
read_results -> Thread created no. 45
startThread -> i am here 1
read_results -> Thread created no. 46
startThread -> i am here 1
startThread -> i am here 1
read_results -> Thread created no. 47
read_results -> Thread created no. 48
read_results -> Thread created no. 49
startThread -> i am here 1
startThread -> i am here 1

It is stopping/waiting on pthread_mutex_lock(&mutexQueue); line, as if is infinitely waiting for mutex to get unlocked and never does that.

Full source code : https://pastebin.com/a6rtTBcX
Run code : https://www.onlinegdb.com/edit/EAou_Yzol

What am i doing wrong?

Thank you.

websockets and pymongo exception

Hello everyone. I am building a websocket client app that receives all the data and inserts it into a MongoDB database. The program runs, but after a while, some exceptions are rising.

import websocket
import _thread
import time
import json
import asyncio

import os
import pwd
import socket
import traceback as tb

import pymongo
from datetime import datetime
import threading
import ssl

httpthreat = mongo['httpthreat']
collection1 = httpthreat['collection1']
websocket_link = "wss://somewebservice.com/service"

def get_data(HOST,PORT,TIMEOUT=5,RETRY=3):
    for itry in range(1,RETRY+1):
        try:   
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(TIMEOUT)
            sock.connect((HOST, PORT))

            # processing some data
            # processing some data
            # processing some data

            return str(data,'utf-8')
        except Exception as e:
            traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
            print("get_data exception")
            time.sleep(TIMEOUT)
            print(traceback_str)
    return None


def process_data(IP,PORT):
    try:
        data = get_data(IP,int(PORT))
        collection1.insert_one({"ip":IP,"port":PORT,"data":data,"date":datetime.now()}) # <==== this line throws exceptions

        # i even tried this, but no use:
        '''
        for i in range(5):
            try:
                collection1.insert_one({"ip":IP,"port":PORT,"data":data,"date":datetime.now()}) # <==== this line throws exceptions anyway
                break
            except pymongo.errors.AutoReconnect:
                print("pymongo.errors.AutoReconnect exception ["+str(i)+"]")
                time.sleep(pow(2, i))
        '''


    except Exception as e:
        traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
        print("process_data exception")
        print(traceback_str)


def on_message(ws, message):
    message_json = json.loads(message)
    if(message_json['protocol'] == "http"):
        try:
            threads = [threading.Thread(target=process_data, args=(message_json['ip'], message_json['port']))]
            for thread in threads:
                thread.start()
            #for thread in threads:
                #thread.join() # waits for thread to complete its task <==== if i uncomment this, program slows
        except Exception as e:
            print("on_message exception")
            traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
            print(traceback_str)

def on_error(ws, error):
    print(error)

def on_close(ws, close_status_code, close_msg):
    print("### closed ###")

    time.sleep(1)
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(websocket_link,
                              on_open=on_open,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)
    ws.daemon = True
    threading.Thread(target=ws.run_forever(ping_interval=70, ping_timeout=10,sslopt={"cert_reqs": ssl.CERT_NONE}))

def on_open(ws):
    print("Opened connection")

if __name__ == "__main__":
    while True:
        try:
            print("Starting")
            websocket.enableTrace(True)
            ws = websocket.WebSocketApp(websocket_link,
                                      on_open=on_open,
                                      on_message=on_message,
                                      on_error=on_error,
                                      on_close=on_close)
            ws.daemon = True
            threading.Thread(target=ws.run_forever(ping_interval=70, ping_timeout=10,sslopt={"cert_reqs": ssl.CERT_NONE}))

        except Exception as e:
            traceback_str = ''.join(tb.format_exception(None, e, e.__traceback__))
            print("main exception")
            print(traceback_str)

        print("Restarting ...")
        time.sleep(1)

I am usining websocket library and python3

Errors that i am getting :

Error1 (occurance interval 10 minutes):

ping/pong timed out
send: b'\x88\x82c\xa0\xc1\xc0`H'
### closed ###
[Errno 9] Bad file descriptor
### closed ###
--- request header ---
GET /ws/service HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: somewebservice.net
Origin: http://somewebservice.net
Sec-WebSocket-Key: Rz2+MPdkcq0zgQqgQN/c7w==
Sec-WebSocket-Version: 13


-----------------------
--- response header ---
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: xRBxqrDNUMz7eNe1hwCxe11Pkxk=
-----------------------
Opened connection

I don't understand why this occurs at this interval, if i load the socket in browser, this never happen

Error2 (occurance interval about 10 minutes):

send: b'\x88\x82\xe4\xadP\xfd\xe7E'
error from callback <function on_close at 0x7fe2e5b4b9d8>: [Errno 9] Bad file descriptor
  File "/usr/lib/python3/dist-packages/websocket/_app.py", line 335, in _callback
    callback(self, *args)
  File "script.py", line 171, in on_close
    threading.Thread(target=ws.run_forever(ping_interval=70, ping_timeout=10,sslopt={"cert_reqs": ssl.CERT_NONE}))
  File "/usr/lib/python3/dist-packages/websocket/_app.py", line 302, in run_forever
    teardown()
  File "/usr/lib/python3/dist-packages/websocket/_app.py", line 226, in teardown
    self.sock.close()
  File "/usr/lib/python3/dist-packages/websocket/_core.py", line 420, in close
    self.shutdown()
  File "/usr/lib/python3/dist-packages/websocket/_core.py", line 432, in shutdown
    self.sock.close()
  File "/usr/lib/python3.7/socket.py", line 420, in close
    self._real_close()
  File "/usr/lib/python3.7/ssl.py", line 1108, in _real_close
    super()._real_close()
  File "/usr/lib/python3.7/socket.py", line 414, in _real_close
    _ss.close(self)
--- request header ---
GET /ws/service HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: something.net

When this happens, it triggers restarting of the websocket.WebSocketApp

I am also getting a lot of 'pymongo' errors

Error3:

Traceback (most recent call last):
  File "script.py", line 100, in process_data
    something.insert_one({"ip":IP,"port":PORT,"date":datetime.now()})
  File "/usr/local/lib/python3.7/dist-packages/pymongo/collection.py", line 613, in insert_one
    comment=comment,
  File "/usr/local/lib/python3.7/dist-packages/pymongo/collection.py", line 547, in _insert_one
    self.__database.client._retryable_write(acknowledged, _insert_command, session)
  File "/usr/local/lib/python3.7/dist-packages/pymongo/mongo_client.py", line 1399, in _retryable_write
    return self._retry_with_session(retryable, func, s, None)
  File "/usr/local/lib/python3.7/dist-packages/pymongo/mongo_client.py", line 1286, in _retry_with_session
    return self._retry_internal(retryable, func, session, bulk)
  File "/usr/local/lib/python3.7/dist-packages/pymongo/mongo_client.py", line 1320, in _retry_internal
    return func(session, sock_info, retryable)
  File "/usr/local/lib/python3.7/dist-packages/pymongo/collection.py", line 542, in _insert_command
    retryable_write=retryable_write,
  File "/usr/local/lib/python3.7/dist-packages/pymongo/pool.py", line 770, in command
    self._raise_connection_failure(error)
  File "/usr/local/lib/python3.7/dist-packages/pymongo/pool.py", line 764, in command
    exhaust_allowed=exhaust_allowed,
  File "/usr/local/lib/python3.7/dist-packages/pymongo/network.py", line 150, in command
    reply = receive_message(sock_info, request_id)
  File "/usr/local/lib/python3.7/dist-packages/pymongo/network.py", line 213, in receive_message
    raise ProtocolError("Got response id %r but expected %r" % (response_to, request_id))
pymongo.errors.ProtocolError: Got response id 1852141647 but expected 402724286

After a while i get a lot of pymongo.errors.AutoReconnect then program does not write to the database anymore.

I don't understand how come a program can run a while then start to throw exception like crazy, especially when you connect it to a MongoDB database with pymongo. I have read about pymongo and it is not fork safe, but thread safe yes.

If i load the websocket in browser, it is simply working with no problems at all.

Any hits please?

Storing ipv4 and ipv6 in MongoDB

Hello

I am building a database (i prefer MongoDB) that i will store over 100 mil ipv4 and ipv6 records for logging purposes. Data sample :

1.1.1.1 -> 0x1010101
1.2.3.4 -> 0x1020304
34.53.63.25 -> 0x22353f19
255.255.255.255 -> 0xffffffff

0001:0001:0001:0001:0001:0001:0001:0001 -> 0x10001000100010001000100010001
1111:1111:1111:1111:1111:1111:1111:1111 -> 0x11111111111111111111111111111111
2345:0425:2CA1:0000:0000:0567:5673:23b5 -> 0x234504252ca1000000000567567323b5
2345:0425:2CA1::0567:5673:23b5          -> 0x234504252ca1000000000567567323b5
ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff -> 0xffffffffffffffffffffffffffffffff

I will have a lot of queries retrieving data by IP. I don't care about space, queries must be as fast as possible.

I was thinking about storing in binary or have different 4 (for ipv4) and 8 columns (for ipv6) spitted IP parts.

What is the most efficient way in terms of speed to achieve that?

MongoDB ipv4/ipv6 hex index

I am building a ipv4/ipv6 geo ip MongoDB database and i will have millions (100+) of ips

Structure will of the database will be

[
    { _id: 58fdbf5c0ef8a50b4cdd9a8e , ip: '34.53.63.25', ip_hex: '0x22353f19' , type: "ipv4", data : [
        {
            country : "CA",
            region : "region1"
            city : "city1",
            blacklisted : "no"
            on_date : ISODate("2022-05-05T00:00:00Z")
        },
        {
            country : "CA",
            region : "region1"
            city : "city1",
            blacklisted : "yes"
            on_date : ISODate("2022-06-05T00:00:00Z")
        },
        {
            country : "US",
            region : "region2"
            city : "city2",
            blacklisted : "no"
            on_date : ISODate("2022-05-05T00:00:00Z")
        },

        ...
    ]},

    { _id: 58fdbf5c0ef8a50b4cdd9a8e , ip: '1.2.3.4', ip_hex: '0x1020304', type: "ipv4", data : [
        {
            country : "CA",
            region : "region1"
            city : "city1",
            blacklisted : "no"
            on_date : ISODate("2022-06-05T00:00:00Z")
        },
    ]},

    { _id: 58fdbf5c0ef8a50b4cdd9a8e , ip: '2345:0425:2CA1:0000:0000:0567:5673:23b5', ip_hex: '0x234504252ca1000000000567567323b5', type: "ipv6", data : [
        {
            country : "FR",
            region : "region1"
            city : "city1",
            blacklisted : "no"
            on_date : ISODate("2022-06-05T00:00:00Z")
        },
        {
            country : "FR",
            region : "region1"
            city : "city1",
            blacklisted : "yes"
            on_date : ISODate("2022-07-05T00:00:00Z")
        },

        ...
    ]},
] 

I am converting all IP string data to HEX :

1.1.1.1 -> 0x1010101
1.2.3.4 -> 0x1020304
34.53.63.25 -> 0x22353f19
255.255.255.255 -> 0xffffffff

0001:0001:0001:0001:0001:0001:0001:0001 -> 0x10001000100010001000100010001
1111:1111:1111:1111:1111:1111:1111:1111 -> 0x11111111111111111111111111111111
2345:0425:2CA1:0000:0000:0567:5673:23b5 -> 0x234504252ca1000000000567567323b5
2345:0425:2CA1::0567:5673:23b5          -> 0x234504252ca1000000000567567323b5
ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff -> 0xffffffffffffffffffffffffffffffff

There will be a lot of searches by IP and it will be added/deleted/updated new data to every IP each day.

I will search ranges of ips, sort, update, delete.

What index is recommended on "ip_hex" column? I was thinking about a B-tree search on HEX not STR.

I want to have an efficient database. What other optimizations should i take into consideration?

Thank you.

Best way to manage a big database

The web application will have a database that consists of millions (over 20mil ... maybe 100mil or more) of hosts (ipv4,ipv6 and domains) and additional data to each host like blacklist, uptime, history, geo data (country, region, city, isp). I could, of course, use other RMDBS system like MySQL, Oracle, PostgreSQL ... but i want speed and I will have a lot of data. I stumbled upon at NoSQL and MongoDB and want to know what are the best way to develop and manage a big database.

I was thinking about using b-tree search on hosts but i will have to sort them and have 4 columns for ipv4 and 8 columns on ipv6. On domains i don't know how i would manage to do that ... i might just use full text search, maybe just a sort will help me.

Database will be constantly updated and used so a clever lock per row must be enabled. Main searches will be by host, geo code, isp, tags and hostId.

hosts table
hostId , host (must be either text or multiple columns) , tags (array) , blacklists (array) , uptime , geoCountry , geoRegion , geoCity, geoIsp , dateInserted

There is also a possibility that database might have a lot of offline hosts and multiple records per host so another history table must be created and whenever a host goes offline or a change occures (blacklist,geo ip data), must go in history.

hosts_history table
hostHistoryId , host (must be either text or multiple columns), tags (array) , blacklists (array) , uptime , geoCountry , geoRegion , geoCity, geoIsp , dateInserted

What would be the best approach and how would can I optimize this project for the best speed as possible?

fstream pointer in vector of struct

Hello.

I am writing a program in C++ that is reading some files with fstream then writes with simple function and after that closes them.

I created a struct and i keep there filetag,filepointer and lines. The problem is that from main i can't write or close the file ... but i can acess filetag and lines variables. Here is my code:

struct Fileoutput
{
    string filetag;
    std::fstream* filepointer;
    int lines = 0;
};

static std::vector<Fileoutput> vect_fileoutputs;

static void open_files()
{
    // open all
    struct Fileoutput fo_all;
    fo_all.filetag = "all";
    fo_all.lines = 0;
    std::fstream fs_all;
    fs_all.open("all.log",std::fstream::out | std::fstream::app);
    fo_all.filepointer = &fs_all;
    //*fo_all.filepointer << "something from struct" << endl; // here it is working, in other scope ... no
    vect_fileoutputs.push_back(fo_all);
    /*
    for(const struct Fileoutput fo : vect_fileoutputs)
    {
        *fo.filepointer << "something from vect" << endl; // here is working
    }
    */

    // open error
    struct Fileoutput fo_error;
    fo_error.filetag = "error";
    fo_error.lines = 0;
    std::fstream fs_error;
    fs_error.open("error.log",std::fstream::out | std::fstream::app);
    fo_error.filepointer = &fs_error;
    vect_fileoutputs.push_back(fo_error);
}

static bool write_to_outputfile(char* outputfile,char* content)
{
bool write = false;
for(const struct Fileoutput fo : vect_fileoutputs)
{
    if(strcmp(fo.filetag.c_str(),outputfile) == 0)
    {
        printf("Writing [%s] to [%s]\n",content,outputfile);
        std::fstream* fs_write;
        fs_write = fo.filepointer;
        *fo.filepointer << content; // NOT WORKING ... seg fault
        write = true;
        break;
    }
}

return write;
}

static void close_files()
{
    for(const struct Fileoutput file_output: vect_fileoutputs)
    {
        printf("Closing file [%s]\n",file_output.filetag.c_str());
        std::fstream* fs_temp;
        fs_temp = file_output.filepointer;
        fs_temp->close(); // NOT WORKING ... seg fault
        printf("Closed [%s]\n",file_output.filetag.c_str());
    }
}

int main(int argc, char* argv[])
{
    open_files();

    write_to_outputfile((char*)"all",(char*)"something1\n");
    write_to_outputfile((char*)"all",(char*)"something2\n");
    write_to_outputfile((char*)"all",(char*)"something3\n");

    write_to_outputfile((char*)"error",(char*)"error\n");
    close_files();

    return 1;
}

The gdb error looks like :

gdb error :

Program received signal SIGSEGV, Segmentation fault.
0x0000000000405de9 in ?? ()
(gdb) x 0x0000000000405de9
0x405de9:   0xe8420348
(gdb) x 0x405de9
0x405de9:   0xe8420348
(gdb) x 0xe8420348
0xe8420348: Cannot access memory at address 0xe8420348
(gdb)

What is wrong with the code?

Thank you.