Hello everyone.

I'm investigating the possibility to add UDP raw (datagram) sockets to ZeroMQ, 
for compatibility with applications that use the BSD Sockets API directly, 
replicating what is done for TCP raw (stream) sockets already, in combination 
with the recently added UDP engine.

I opened a pull request for it, but I'm still tweaking things while diving 
through the code base and learning how it works: 
https://github.com/zeromq/libzmq/pull/1986

According to the examples, communication with raw sockets works by sending or 
receiving 2 messages: the first containing a remote peer identifier and the 
second the actual data transmitted. So the user has to manually handle it with 
SENDMORE and RECVMORE flags.

For now, I'm trying to mimic that for dgram_t (new class I set) by copying the 
IPv4 address structure (sockaddr_in) to the peer identifier, so that we can 
acquire an unique identifier with a recvfrom() call, and use the same value to 
know where to send the message buffer with sendto(), at the UDP engine level. 
The higher level logic (dgram_t::xsend and dgram_t::xrecv) was simply copied 
from stream_t.

On the other side, socket types like radio and dish, even if they use 
identifiers/groups as well, do it with a single message, by putting content 
data and group data on different fields (accessed with zmq_msg_data() and 
zmq_msg_*group() functions).

Looking in msg.hpp I can see that messages have a union of structs to handle 
different types of messages, and all of them contain the fields

struct {
    ...
    char group [16];                        // I guess that radio/dish, pub/sub 
use this
    uint32_t routing_id;                    // I guess that router/dealer uses 
this
};

that wouldn't be used for ZMQ_STREAM and ZMQ_DGRAM messages, if we handle 
everything as content data, leaving 20 bytes unused in the message.

Why don't raw stream sockets use this available memory to store a peer 
identifier, avoiding the need for manually sending two messages ?

In the case of datagrams, wouldn't be a good idea to add a new struct to this 
union with a field **unsigned char address[20]** to store the address info ? 
According to the Beej's guide 
(http://beej.us/guide/bgnet/output/html/multipage/ipstructsdata.html), the 
structure

    struct sockaddr_in {
        short int          sin_family;  // Address family, AF_INET
        unsigned short int sin_port;    // Port number
        struct in_addr     sin_addr;    // Internet address (32 bits)
        unsigned char      sin_zero[8]; // Same size as struct sockaddr
    };

has **2 + 2 + 4 + 8 = 16** bytes of data, which fits the available memory.

The downside of it is that we couldn't store IPv6 addresses

    struct sockaddr_in6 {
        u_int16_t       sin6_family;   // address family, AF_INET6
        u_int16_t       sin6_port;     // port number, Network Byte Order
        u_int32_t       sin6_flowinfo; // IPv6 flow information
        struct in6_addr sin6_addr;     // IPv6 address
        u_int32_t       sin6_scope_id; // Scope ID
    };

    struct in6_addr {
        unsigned char   s6_addr[16];   // IPv6 address
    };

as they need **2 + 2 + 4 + 16 + 4 = 28** bytes. But maybe we could create a 
common structure to store both addresses info, ignore the **sin6_flowinfo** and 
**sin6_scope_id** fields (storing exactly 20 bytes),  and rebuild the 
sockaddr_in* structures on demand:

    typedef struct socket_address {
        uint16_t family;                          // stores sin_family or 
sin6_family (or ZMQ_IPV4 or ZMQ_IPV6)
        uint16_t port;                             // stores sin_port or 
sin6_port
        unsigned char host[16];             // stores sin_addr (first 4 bytes) 
or sin6_addr
    } socket_address_t;

Obviously, as there is zmq_msg_*routing_id() and zmq_msg_*group() functions, we 
would need zmq_msg_*address() as well.

Or, as @hitstergtd suggested on the github issue, more direct and intuitive 
zmq_msg_send_to() and zmq_msg_recv_from() calls. Aside from socket, message and 
flags, these calls would receive, as parameters, a opaque void* pointer to the 
address (so that it could be used to point to ZMQ_STREAM sockets peer 
identifiers). With value NULL for send_to(), the message would be sent to the 
address the socket was connected/bound to (could be useful for multicast).

Taking a better look at udp_engine.cpp, where de messages are actually 
sent/received:

void zmq::udp_engine_t::out_event()
{
    msg_t group_msg;
    int rc = session->pull_msg (&group_msg);
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));

    if (rc == 0) {
        msg_t body_msg;
        rc = session->pull_msg (&body_msg);

        size_t group_size = group_msg.size ();
        size_t body_size = body_msg.size ();
        size_t size = group_size + body_size + 1;

        out_buffer[0] = (unsigned char) group_size;
        memcpy (out_buffer + 1, group_msg.data (), group_size);
        memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);

        [...]

        rc = sendto (fd, out_buffer, size, 0,
            address->resolved.udp_addr->dest_addr (),
            address->resolved.udp_addr->dest_addrlen ());
        errno_assert (rc != -1);
    }
    [...]
}

void zmq::udp_engine_t::in_event()
{
    int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
    [...]

    int group_size = in_buffer[0];

    //  This doesn't fit, just ingore
    if (nbytes - 1 < group_size)
        return;

    int body_size = nbytes - 1 - group_size;

    msg_t msg;
    int rc = msg.init_size (group_size);
    errno_assert (rc == 0);
    msg.set_flags (msg_t::more);
    memcpy (msg.data (), in_buffer + 1, group_size);

    rc = session->push_msg (&msg);
    [...]

    [...]
    memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
    rc = session->push_msg (&msg);
    [...]
}

I can see that the single message with group information set with 
zmq_msg_data() and zmq_msg_set_group() for radio/dish sockets (the only ones to 
currently use the UDP engine) became, at this point, 2 separate messages, 
pulled from/pushed to the session.

Further investigation showed that this separation only happens on the 
push_msg() and pull_msg() overrides of radio_session_t and dish_session_t. Why 
the message is not simply forwarded, like in session_base_t, and we 
extract/fill group info from the u.base.group field of msg_t ? Obviously I 
don't understand all the code, but, for now, I don't see a reason for it.

Could I ask for some feedback about my proposed additions/changes, and insight 
into the need for a different session push/pull for radio and dish sockets ?

Sorry for the long post and possibly bad English. Thanks in advance.            
                          
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to