> >>>>>> Using AF_UNIX/SOCK_DGRAM with current version (3.2.0) seems to
> >>>>>> drop messages or at least they are not received in the same order
> >>>>>> they are  sent
> >>
> >> [snip]
> >>
> >>> Thanks for the test case.  I can confirm the problem.  I'm not
> >>> familiar enough with the current AF_UNIX implementation to debug
> >>> this easily.  I'd rather spend my time on the new implementation (on
> >>> the topic/af_unix branch).  It turns out that your test case fails
> >>> there too, but in a completely different way, due to a bug in sendto
> >>> for datagrams.  I'll see if I can fix that bug and then try again.
> >>>
> >>> Ken
> >>
> >> Ok, too bad it wasn't our own code base but good that the "mystery"
> >> is verified
> >>
> >> I finally succeed to build topic/af_unix (after finding out what
> >> version of zlib was needed), but not with -D__WITH_AF_UNIX to
> >> CXXFLAGS though and thus I haven’t tested it yet
> >>
> >> Is it sufficient to add the define to the "main" Makefile or do you
> >> have to add it to all the Makefile:s ? I guess I can find out though
> >
> > I do it on the configure line, like this:
> >
> >   ../af_unix/configure CXXFLAGS="-g -O0 -D__WITH_AF_UNIX" --prefix=...
> >
> >> Is topic/af_unix fairly up to date with master branch ?
> >
> > Yes, I periodically cherry-pick commits from master to topic/af_unix.
> > I'lldo that again right now.
> >
> >> Either way, I'll be glad to help out testing topic/af_unix
> >
> > Thanks!
> 
> I've now pushed a fix for that sendto bug, and your test case runs without
> error on the topic/af_unix branch.

It seems like the test-case do work now with topic/af_unix in blocking mode, 
but when using non-blocking (with MSG_DONTWAIT) there are some issues I think

1. When the queue is empty with non-blocking recv(), errno is set to EPIPE but 
I think it should be EAGAIN (or maybe the pipe is getting broken for real of 
some reason ?)

2. When using non-blocking recv() and no message is written at all, it seems 
like recv() blocks forever

3. Using non-blocking recv() where the "client" does send less than "count" 
messages, sometimes recv() blocks forever (as well)


My naïve analysis of this is that for the first issue (if any) the wrong errno 
is set and for the second issue it blocks if no sendto() is done after the 
first recv(), i.e. nothing kicks the "reader thread" in the butt to realise the 
queue is empty. It is not super clear though what POSIX says about creating 
blocking descriptors and then using non-blocking-flags with recv(), but this 
works in Linux any way

Let me know if I should provide more a specific explanation, but I think minor 
modifications of the test-case can provoke all behaviours. I think 2 and 3 are 
of the same reason though (as described above)


> By the way, I think the implementation of sendto/recv for datagrams is very
> inefficient when there are repeated calls to sendto as in your test case.
> Nevertheless, your test case actually runs slightly faster on the 
> topic/af_unix
> branch than it does on master (when the latter succeeds, which it does about
> half the time for me).  So I'm not sure whether it's worth worrying about 
> this.

Of course we would like the best throughput possible 😉

> Here's the issue, briefly.  The communication is done via a Windows named
> pipe.
>   The receiver creates the pipe when it creates and binds its socket.  It 
> creates
> only one pipe instance.  The sender connects to the pipe, writes, and closes 
> its
> handle.  But the pipe is not available for another sender to connect to until 
> the
> receiver reads the message, after which it disconnects the sender.

Ok, in our application we will use long lived descriptors and multiple writers 
that possible send large business messages (chunked into some smaller pieces 
per sendto()/recv())

> Ken[Kristian] 

Best regards,
Kristian
#include <sys/socket.h>
#include <sys/un.h>

#undef AF_UNIX
#define AF_UNIX 31

#include <unistd.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <thread>
#include <chrono>


// $ g++ --std=gnu++17 af_unix.cpp

const char* const path = "address";
const int count = 10000;
const int size = BUFSIZ * 8;

int client()
{
    const int fd = socket( AF_UNIX, SOCK_DGRAM, 0);

    if( fd == -1)
    {
        perror( "socket error");
        return -1;
    }

    struct sockaddr_un address{};

    strcpy( address.sun_path, path);
    address.sun_family = AF_UNIX;

    char buffer[size] = {};

    for( int idx = 0; idx < 100; ++idx)
    {
        memcpy( buffer, &idx, sizeof idx);

        const ssize_t result = sendto( fd, buffer, size, 0, (struct 
sockaddr*)&address, sizeof address);

        // Assume the whole chunk can be sent
        if( result != size)
        {
            perror( "sendto error");
            return -1;
        }
    }

    close( fd);
    return 0;
}

int server()
{
    const int fd = socket( AF_UNIX, SOCK_DGRAM, 0);

    if( fd == -1)
    {
        perror( "socket error");
        return -1;
    }

    struct sockaddr_un address{};

    strcpy( address.sun_path, path);
    address.sun_family = AF_UNIX;

    const int result = bind( fd, (struct sockaddr*)&address, sizeof address);

    if( result == -1)
    {
        perror( "bind error");
        return -1;
    }

    return fd;
}

int main( int argc, char* argv[])
{
    const int fd = server( );

    if( fd != -1)
    {
        fprintf( stdout, "%d\tnumber of packages\n", count);
        fprintf( stdout, "%d\tbytes per package\n", size);

        std::thread{ [&](){client( );}}.detach();

        std::this_thread::sleep_for( std::chrono::microseconds( 500));
    
        char buffer[size] = {};

        for( int idx = 0; idx < count; ++idx)
        {
            const ssize_t result = recv( fd, buffer, size, MSG_DONTWAIT);

            // Assume the whole chunk can be read
            if( result != size)
            {
                perror("recv error");
                //fprintf( stderr, "index: %d\n", idx);
                unlink( path);
                return -1;
            }

            int index = 0;
            memcpy( &index, buffer, sizeof idx);

            if( index != idx)
            {
                fprintf( stderr, "expected %d but got %d\n", idx, index);
                unlink( path);
                return -1;
            }
        }

        close( fd);
        unlink( path);
    }

    return 0;
}
--
Problem reports:      https://cygwin.com/problems.html
FAQ:                  https://cygwin.com/faq/
Documentation:        https://cygwin.com/docs.html
Unsubscribe info:     https://cygwin.com/ml/#unsubscribe-simple

Reply via email to