Hmm - perhaps I spoke too soon...

This looked like it worked in that little test application but when I plugged 
it into my actual application, it seemed to work some of the times and not 
others. I then took your advice and introduced a join in the test application 
and as soon as I do so, stopping the subscriber on the subscriber thread 
stopped working (perhaps it never did work properly and this just flagged it).

I have attached the latest source for it with the subscriber living entirely on 
its own thread, and with a join now introduced in main(). It deadlocks with the 
same backtrace as originally reported when trying to run pn_messenger_stop.

Cheers,
Frank

-----Original Message-----
From: Frank Quinn
Sent: 14 June 2013 13:49
To: proton@qpid.apache.org
Subject: RE: Deadlock in pn_messenger_stop? (C Qpid Library)

Thanks Ted,

I did try inserting a sleep of 1s between the send and the stop (I suspected 
the same thing you did), but the behaviour persisted so I don't think the 
problem was the time for the message to land.

However, your suggestion to stop it in the subscriber thread did seem to work 
(still unsure why, but I'll not question it for now), thanks!

Cheers,
Frank

-----Original Message-----
From: Ted Ross [mailto:tr...@redhat.com]
Sent: 14 June 2013 13:09
To: proton@qpid.apache.org
Subject: Re: Deadlock in pn_messenger_stop? (C Qpid Library)

I would suggest that you not stop the subscriber messenger in the main thread.  
Rather, stop it in the subscriber thread right before it exits.  Alternatively 
(preferably), you should pthread_join the thread in main before stopping the 
messengers.

It looks to me that you've got a race condition where main is stopping mSub at 
the same time the thread is processing messages on mSub.

Keep in mind that pn_messenger_send is only going to block until the message 
has been pushed to the socket.  It will not wait for the message to be received 
and processed by the other thread.

-Ted


On 06/14/2013 07:10 AM, Frank Quinn wrote:
> Hi Folks,
>
> See attached code: I'm encountering a deadlock when I try to stop
> messengers. The general workflow is:
>
> 1. Create pub and sub Messengers
> 2. Start the Messengers
> 3. Thread sub off onto its own thread as recv is a blocking call 4.
> Publish round trip from the pub messenger to the sub messenger with a
> destroy subject (recv is uninteruptable at the moment so this is our
> only to interrupt it) 5. Stop the messengers
>
> When I try and stop the messengers, the application deadlocks with the
> following backtrace (there is only one thread running at this point as
> the subscribe thread has since exited):
>
> Thread 1 (Thread 0x7f38181a4840 (LWP 6688)):
> #0  0x0000003518ce99ad in poll () at
> ../sysdeps/unix/syscall-template.S:81
> #1  0x000000309c226a1c in poll (__timeout=<optimized out>,
> __nfds=<optimized out>, __fds=<optimized out>)
>     at /usr/include/bits/poll2.h:46
> #2  pn_driver_wait_2 (d=d@entry=0x1a81140, timeout=<optimized out>,
> timeout@entry=-1)
>     at /usr/src/debug/qpid-proton-0.4/proton-c/src/posix/driver.c:752
> #3  0x000000309c226c42 in pn_driver_wait (d=0x1a81140,
> timeout=timeout@entry=-1)
>     at /usr/src/debug/qpid-proton-0.4/proton-c/src/posix/driver.c:807
> #4  0x000000309c2242d3 in pn_messenger_tsync (messenger=0x1a81050,
>     predicate=0x309c222d80 <pn_messenger_stopped>, timeout=<optimized
> out>)
>     at /usr/src/debug/qpid-proton-0.4/proton-c/src/messenger.c:623
> #5  0x0000000000400ffb in main () at qpid_deadlock_repro.c:123
>
> Is this the correct workflow for this or am I missing a flush or
> unlock step somewhere along the way?
>
> Cheers,
> Frank
>
> ----------------------------------------------------------------------
> --
>
> Please consider the environment before printing this e-mail.
>
> This e-mail may contain confidential and/or privileged information. If
> you are not the intended recipient or have received this e-mail in
> error, please advise the sender immediately by reply e-mail and delete
> this message and any attachments without retaining a copy.
>
> Any unauthorised copying, disclosure or distribution of the material
> in this e-mail is strictly forbidden.
> ----------------------------------------------------------------------
> --
>
> *Please consider the environment before printing this email.*
>
> *Visit our website at http://www.nyse.com
> **********************************************************************
> *******
>
> Note: The information contained in this message and any attachment to
> it is privileged, confidential and protected from disclosure. If the
> reader of this message is not the intended recipient, or an employee
> or agent responsible for delivering this message to the intended
> recipient, you are hereby notified that any dissemination,
> distribution or copying of this communication is strictly prohibited.
> If you have received this communication in error, please notify the
> sender immediately by replying to the message, and please delete it
> from your system. Thank you. NYSE Euronext. *
>


________________________________

Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are 
not the intended recipient or have received this e-mail in error, please advise 
the sender immediately by reply e-mail and delete this message and any 
attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this 
e-mail is strictly forbidden.
Please consider the environment before printing this email.

Visit our website at http://www.nyse.com

****************************************************

Note:  The information contained in this message and any attachment to it is 
privileged, confidential and protected from disclosure.  If the reader of this 
message is not the intended recipient, or an employee or agent responsible for 
delivering this message to the intended recipient, you are hereby notified that 
any dissemination, distribution or copying of this communication is strictly 
prohibited.  If you have received this communication in error, please notify 
the sender immediately by replying to the message, and please delete it from 
your system.  Thank you.  NYSE Euronext.
#include <proton/messenger.h>
#include <proton/error.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define     PUB_ADDRESS                         "amqp://localhost/"
#define     SUB_ADDRESS                         "amqp://~localhost/"
#define     QPID_DISPATCH_DESTROY_SUBJECT       "QPID_DESTROY_ME"

/* Function prototype */
void* qpidListenerThread (void* closure);

/* Comms bridge container */
typedef struct qpidCommsBridge_
{
    pn_messenger_t* mSub; /* Subscriber */
    pn_messenger_t* mPub; /* Publisher */
} qpidCommsBridge;

void* qpidListenerThread (void* closure)
{
    qpidCommsBridge* bridge = (qpidCommsBridge*) closure;

    /* Create and start the subscribing messenger */
    bridge->mSub = pn_messenger ("mSub");
    pn_messenger_start (bridge->mSub);

    /* Subscribe to data coming in */
    pn_messenger_subscribe (bridge->mSub, SUB_ADDRESS);

    /* Create reusable message */
    pn_message_t* msg = pn_message();

    /* Block until explicit exit */
    while (1)
    {
        /* Blocks until message received */
        if (pn_messenger_recv (bridge->mSub, 10))
        {
            /* Will get in here if recv failed */
            printf ("Error receiving message\n");
            return NULL ;
        }

        /* Parse data received */
        while (pn_messenger_incoming (bridge->mSub))
        {
            if (pn_messenger_get (bridge->mSub, msg))
            {
                printf("Error getting message\n");
            }

            const char* subject = pn_message_get_subject (msg);

            if(strcmp(subject, QPID_DISPATCH_DESTROY_SUBJECT) == 0)
            {
                printf("Request to destroy received - stopping sub...\n");
                pn_messenger_stop (bridge->mSub);
                printf("Request to destroy received - finished stopping\n");
                return NULL;
            }
            else
            {
                printf("Received message with subject '%s'\n", subject);
            }
        }
    }
    return NULL ;
}

/*
 * At the time of writing, this produces the output:
 *
 * Creating the publishing messengers
 * Starting the publishing messenger
 * Starting listener thread
 * pthread_create successful
 * Creating message for sending
 * Setting the subject for the message
 * Setting the address and sending message to subscriber
 * Rejoining subscriber to main thread
 * Request to destroy received - stopping sub... <--- Deadlocks here
 */

int main ()
{
    pthread_t subscribe_thread;
    int rc;

    /* Allocate container memory */
    qpidCommsBridge* bridge =
            (qpidCommsBridge*) malloc (sizeof(qpidCommsBridge));

    /* Create the publishing messenger */
    printf("Creating the publishing messengers\n");
    bridge->mPub = pn_messenger ("mPub");

    /* Activate the messengers */
    printf("Starting the publishing messenger\n");
    pn_messenger_start (bridge->mPub);

    /* Fire up the subscriber on its own thread */
    printf("Starting listener thread\n");
    if (0 != (rc = pthread_create (&subscribe_thread, NULL, qpidListenerThread,
                                   bridge)))
    {
        printf ("pthread_create returned %d - exiting", rc);
        return 1;
    }
    else
    {
        printf ("pthread_create successful\n", rc);
    }

    /* Create the payload to be sent */
    printf("Creating message for sending\n");
    pn_message_t* msg = pn_message ();

    /* Set the subject for the destroy string */
    printf("Setting the subject for the message\n");
    pn_message_set_subject (msg, QPID_DISPATCH_DESTROY_SUBJECT);

    /* Create the messenger for publishing out this desist message */
    printf("Setting the address and sending message to subscriber\n");
    pn_message_set_address (msg, PUB_ADDRESS);
    pn_messenger_put (bridge->mPub, msg);
    pn_messenger_send (bridge->mPub);

    printf("Rejoining subscriber to main thread\n");
    int status = 0;
    pthread_join (subscribe_thread, NULL);

    /* Stop the messengers */
    printf("Stopping the messenger's pub\n");
    pn_messenger_stop (bridge->mPub);

}

Reply via email to