Got a chance to investigate the test case. For each notification
request registered, only one notification will be sent. Once
a notification is sent to the process, the registration is removed.
The process has to re-register the notification again to receive the
next notification. This is documented in the mq_notify() man page.

In your test case, you receive one notification. The notify_thread()
routine gets called. It loops reading in all the messages as long as there
are messages in the queue. Once the message queue becomes empty it returns.
Since there is no active registration, further notification will not be sent.
That is when it hangs.

You can re-register the notification request in notify_thread(). Note that the
notification is sent only when the message queue state transitions from
empty->notempty.  That means you will have to re-register the notification
request  first and read any messages that are present on the message queue.
Due to this behavior, with SIGEV_THREAD type notification, you can
potentially have more then one thread calling notify_thread() routine.

-Prakash.





Michael Schulte wrote:
Hey Bart and Prakash,

first of all: A happy New Year!

I have seen that a good old RFE 4017841 to implement SIGEV_THREAD
notification type of timers, message queues and asynchronous I/O
has successfully been committed a couple of months ago
in our next version of Solaris. :-) (ca. build 40 or so).

There is a problem with the code in the area of POSIX
message queues that becomes obvious, once you run my
(test-) program on e.g. a Ultra-10 with Sol 5.11 build 54
installed.

The program occasionally hangs:
$ cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
$ notify_thread 10 10 1
Test is starting nmsg=10, nruns=10, delay=1.
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
->notify_thread 3 called with argument 42 and counter 0.
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   message queue is empty.
<-notify_thread 3 left with counter 20
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
<<<< Now it hangs >>>>>>>>>

When I examine the stack of the parent, I can see this:
$ pstack 2570   (<--- this is the child and is OK)
2570:   notify_thread 10 10 1
 ff2bf850 lwp_sema_timedwait (ff350098, 0, 1)
 ff2a9aac sem_wait (ff350098, 3, 4d534751, 20, fffffff0, 0) + 20
 ff2a752c __mq_timedsend (22940, ffbfec50, 9, 1, 0, 0) + b0
 00011814 main     (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 33c
 00010e18 _start   (0, 0, 0, 0, 0, 0) + 108

$ pstack 2569  (<--- this is the parent that shouldn't hang)
2569:   notify_thread 10 10 1
-----------------  lwp# 1 / thread# 1  --------------------
 ff2bfa54 waitid   (0, a0a, ffbfea98, 3)
 ff2b226c waitpid  (a0a, ffbfec40, 0, 0, ff392000, ff392000) + 58
 000119e4 main     (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 50c
 00010e18 _start   (0, 0, 0, 0, 0, 0) + 108
-----------------  lwp# 2 / thread# 2  --------------------
 ff2be5e0 lwp_park (0, 0, 0)
 ff2b8448 cond_wait_queue (ff390850, ff390838, 0, 0, 0, 0) + 28
ff2b7ecc sig_cond_wait (ff390850, ff390838, ff17bfa0, ff17bf80, 6073c, ff390838) + 10 ff2aa160 mqueue_spawner (ff390800, ff390838, 2a, ff30a5f0, 60510, 0) + 84
 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
-----------------  lwp# 3 / thread# 3  --------------------
 ff2be5e0 lwp_park (0, 0, 0)
 ff2b8448 cond_wait_queue (ff3908b0, ff390888, 0, 0, 0, 0) + 28
ff2b7ecc sig_cond_wait (ff3908b0, ff390888, 3, ff07bf88, ff1c0000, 3) + 10 ff2aaed4 tpool_worker (ff390280, ff3902c0, 0, ff30a5f0, ff3908b0, ff390888) + 104
 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
$


So the worker thread "tpool_worker" is missing its "wake up"
on the condition variable. Why is it missing it?

I think, that the bug is in usr/src/lib/libc/port/rt/mqueue.c
and it could be done with the following fix ( add these lines >>>
after line 711 and in particular the line after 1014).

...
688     /*
689      * Now determine if we want to kick the notification.  POSIX
690      * requires that if a process has registered for notification,
691      * we must kick it when the queue makes an empty to non-empty
692      * transition, and there are no blocked receivers.  Note that
693      * this mechanism does _not_ guarantee that the kicked process
694      * will be able to receive a message without blocking;
695      * another receiver could intervene in the meantime.  Thus,
696      * the notification mechanism is inherently racy; all we can
697      * do is hope to minimize the window as much as possible.
698      * In general, we want to avoid kicking the notification when
699      * there are clearly receivers blocked.  We'll determine if
700      * we want to kick the notification before the mq_putmsg(),
701      * but the actual signotify() won't be done until the message
702      * is on the queue.
703      */
704     if (mqhp->mq_sigid.sn_pid != 0) {
705         int nmessages, nblocked;
706
707         (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
708         (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
709
710         if (nmessages == 0 && nblocked == 0)
711             notify = 1;
    >>>    /*
    >>>     * Force a kick of the receiver, if sender would block
    >>>     * because message queue would be full and receiver would
    >>>  * not continue receiving messages.
    >>>  */
    >>> if (nmessages == mqhp->mq_maxmsg - 1)
    >>>     notify = 1;
712     }
713
714     mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
715     (void) sem_post(&mqhp->mq_notempty);
716
717     if (notify) {
718         /* notify and also delete the registration */
719         (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
720         if (mqhp->mq_ntype == SIGEV_THREAD ||
721             mqhp->mq_ntype == SIGEV_PORT)
722             (void) sem_post(&mqhp->mq_spawner);
723         mqhp->mq_ntype = 0;
724         mqhp->mq_des = 0;
725     }
...
1012         switch (ntype) {
1013         case SIGEV_THREAD:
1014 case SIGEV_PORT: >>> mqhp->mq_sigid.sn_pid = getpid();
1015             tcdp->tcd_port = port;
1016             tcdp->tcd_msg_object = mqdp;
1017             tcdp->tcd_msg_userval = userval;
1018             sig_mutex_lock(&tcdp->tcd_lock);
1019             tcdp->tcd_msg_enabled = ntype;
1020             sig_mutex_unlock(&tcdp->tcd_lock);
1021             (void) cond_broadcast(&tcdp->tcd_cv);
1022             break;
1023         }

Perhaps you can confirm my findings and if you decide
to open a bug, could you please drop me short note in
which build it would be fixed, so that I can update my box
at that time which - as I said - currently
runs build 54 (Community release).

Many greetings from Germany,
best regards,

Michael Schulte

------------------------------------------------------------------------

/*
 * cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
 *
 * purpose: Demonstrate usage of SIGEV_THREAD.
 *
 * Call mq_notify(mqd_t mqdes, *notification) to attach a notification
 * request to a valid mqdes. Call mq_send() from another process and
 * send messages. Whenever (in the parent) the message queue transits
 * from EMPTY -> NONEMPTY, start a thread that fetches the message(s)
 * being in the message queue.
 * Demonstrate usage of non-async-signal safe functions in notify_thread()
 * by doing some printing and a simple mutex/count game.
 *
 * Author: Michael Schulte
 * 2004-02-11
 */

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <mqueue.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>

#define str_LEN 256


static mqd_t    mq;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int counter = 0;
static int nruns;
static int delay_in_secs;



/*
 * notify_thread(par_sigval)
 *
 * This is the notification function which is called in a new thread
 * when the message queue makes a transition from EMPTY -> NONEMPTY.
 */
static void
notify_thread(union sigval par_sigval)
{
        char msg[str_LEN];
        ssize_t size;
        int status;
        
        (void) printf("->notify_thread %d called with argument %d and "
            "counter %d.\n", pthread_self(), par_sigval.sival_int, counter);
/*
         * Now we can fetch the messages. As O_NONBLOCK is specified,
         * all messages currently in the message queue are fetched.
         */
        do {
                msg[0] = '\0';
                size = mq_receive(mq, msg, str_LEN, NULL);
                if (size > 0) {
                        (void) printf("   thr %d message: <%s> with size %d\n",
                            pthread_self(), msg, size);
                        /*
                         * Now we can do some processing and async signal
                         * unsafe stuff (as we are in a thread).
                         */
                        status = pthread_mutex_lock (&mutex);
                        if (status != 0) {
                                perror ( "Lock mutex");
                                exit (-1);
                        }
                        if (++counter >= nruns) {
                                status = pthread_cond_signal (&cond);
                                if (status != 0) {
                                        perror ( "Signal condition");
                                        exit (-1);
                                }
                        }
                        status = pthread_mutex_unlock (&mutex);
                        if (status != 0) {
                                perror ( "Unlock mutex");
                                exit (-1);
                        }
                        if ((counter % 7) == 0)
                                (void) sleep(delay_in_secs);
                } else {
                        if (errno == EAGAIN)
                                (void) printf("   message queue is empty.\n");
                        else
                                (void) printf("   XXX <%s>\n", strerror(errno));
                }
        } while (size >= 0);
        (void) printf ("<-notify_thread %d left with counter %d\n",
            pthread_self(), counter);
}



void main(int argc, char *argv[])
{
        char    buf_uns[str_LEN];
        char    msg[str_LEN];
        char    *name = "notify";
        int     ret = 0;
        pid_t   pid;
        int     status;
        struct mq_attr  attrs;
        struct sigevent notification;
        int     nmsg;


        if (argc != 4) {
                (void) printf("notify_thread <nr of messages to send> <nr of 
experiments> "
                    "<delay in secs>\n");
                exit(1);
        }
nmsg = atoi(argv[1]);
        nruns = atoi(argv[2]);
        delay_in_secs = atoi(argv[3]);

        (void) printf("Test is starting nmsg=%d, nruns=%d, delay=%d.\n", nmsg, 
nruns,
            delay_in_secs);
        
        (void) sprintf(buf_uns, "/%s%ld", name, (long)getpid());

        /*
         * Set members of the mq_attr structure
         * and create an open message queue description.
         */
        attrs.mq_msgsize = str_LEN;
        attrs.mq_maxmsg = 10;
        attrs.mq_flags = O_NONBLOCK;

        mq = mq_open(buf_uns, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666,
            &attrs);
        if (mq == (mqd_t)-1) {
                (void) printf("mq_open() failed with errno %d.\n", errno);
                exit(1);
        }

        /*
         * Set notification structure to start the thread, when mq
         * transits from EMPTY -> NONEMPTY.
         */
        notification.sigev_notify = SIGEV_THREAD;
        notification.sigev_notify_function = notify_thread;
        notification.sigev_value.sival_int = 42;
        notification.sigev_notify_attributes = NULL;
        
        /* register the calling process for notification */
        ret = mq_notify(mq, &notification);
        if (ret != 0) {
                (void) printf("mq_notify() failed. errno = %d\n", errno);
                goto fin;
        }

        if ((pid = fork()) < 0) {
                (void) printf("fork() failed with errno %d.\n", errno);
                exit(1);
        }

        if (pid == 0) {
                /* In the child */
                int i,j;
                                
                (void) sprintf(buf_uns, "/%s%ld", name, (long)getppid());

                /* Open the message queue for writing. */
                attrs.mq_msgsize = str_LEN;
                attrs.mq_maxmsg = 10;
                attrs.mq_flags = 0;

                mq = mq_open(buf_uns, O_RDWR, 0666, &attrs);
                if (mq == (mqd_t)-1) {
                        (void) printf("mq_open() failed with errno %d.\n",
                            errno);
                        exit(1);
                }

                /* the child sends a message to a message queue */
                sprintf(msg, "test %d", getpid());
                j = 0;
                while (j < nruns) {
                        for (i = 0; i < nmsg; i++) {
                                ret = mq_send(mq, msg, strlen(msg), 1);
                                (void) printf("CHILD: msg sent\n");
                                if (ret != 0) {
                                        (void) printf("mq_send() failed in child 
with %d.\n",
                                            errno);
                                        nruns = 0;
                                        exit(99);
                                }
                        }
                        sleep(delay_in_secs);
                        j++;
                }
                (void) printf("Child exit.\n");
                exit(0);
        }

        /* In the parent */

        ret = pthread_mutex_lock (&mutex);
        if (ret != 0) {
                perror ( "Lock mutex");
                goto fin;
        }
        while (counter < nruns) {
                ret = pthread_cond_wait (&cond, &mutex);
                if (ret != 0) {
                        perror ( "Wait on condition");
                        goto fin;
                }
        }
        ret = pthread_mutex_unlock (&mutex);
        if (ret != 0) {
                perror ( "Unlock mutex");
                goto fin;
        }

        ret = waitpid(pid, &status, 0);
        (void) printf("Parent awoken.\n");
        if (ret == pid) {
                if (WEXITSTATUS(status) != 0) {
                        /* The child terminated with error */
                        (void) printf("The child terminated with exit status 
%d.\n",
                            WEXITSTATUS(status));
                        goto fin;
                }
        }

        (void) printf("Parent got %d messages.\n", counter);
fin:
        (void) mq_close(mq);

        ret = mq_unlink(buf_uns);
        if (ret != 0) {
                (void) printf("mq_unlink() failed with %d\n", errno);
                exit(ret);
        }

        (void) printf("Test finished.\n");
        exit(ret);
}
------------------------------------------------------------------------

_______________________________________________
perf-discuss mailing list
[email protected]

_______________________________________________
perf-discuss mailing list
[email protected]

Reply via email to