Hi all,

This issue doesn't seem to be so complicated to reproduce. I used a
following scenario:

1) Create a large queue on a broker (C++ / Linux)
2) Start feeding messages into the queue using C++/Linux program (in my
case I used approximately 1kB messages)
3) Connect with a receiver (C++/Windows) using SSL and prefetch 1000 (no
client authentication, I used username & password)
4) Wait few seconds to see the error in the receiver

For the C++/Windows receiver, I used the Red Hat MRG-M 2.3.3 WinSDK
(building it from Qpid spource codes almost never works for me :-o). But
given the experience from Hamid and Gene, I do not think this might work
the same way with builds based on latest Qpid sources.

The key to reproducing it seems to be the capacity & the SSL. Without SSL
it doesn't seem to be reproducible. With SSL and small receiver capacity
(in my scenario above, with capacity 50 it seems to need much more time to
fail, with capacity 10 I didn't managed to reproduce it), it seems to be
working fine as well. In a similar way - increasing the message size to 1MB
causes even the capacity 1 to stop working.

Source code of the receiver as well as its full trace+ log are attached.

@Steve: Is there something else I can collect / do to help find and solve
the problem?

Thanks & Regards
Jakub



On Mon, Jul 29, 2013 at 4:24 PM, Hamid.Shahid <[email protected]>wrote:

> It is the same case for us, we have seen this issue in production
> environment
> more then 10 times using Qpid 0.18.
>
> After reading the post, I went through the code of QPID 0.18 and I found
> that there is no BufferCount specified. Now, I have downloaded the latest
> QPID 0.22 C++ version on windows 7 as I saw the BufferCount logic in 0.22
> version.
>
> Please let me know, is there any recommend number for BufferCount?
>
> Moreover, surprisingly, I still have to do the SSL patching, I dont know,
> why its sill not part of the API?
>
> Thanks,
> Hamid.
>
>
>
> -----
> Best Regards,
> Hamid.
> --
> View this message in context:
> http://qpid.2158936.n2.nabble.com/Qpid-throwed-WSAENOBUFS-while-receiving-data-from-a-broker-tp7592938p7596041.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/exceptions.h>

#include <ctime>
#include <cstdlib>
#include <iostream>

#include <sstream>

using namespace qpid::messaging;

using std::stringstream;
using std::string;

int main(int argc, char** argv) {
    const char* url = "amqp:rgc004.xeop.de:10101";
    std::string connectionOptions = "{ username: 'ABCFR_ABCFRALMMACC1', password: 'ABCFR_ABCFRALMMACC1', transport: 'ssl', sasl-mechanism: 'PLAIN', heartbeat: '30',  }";

    Address broadcast("broadcast.ABCFR_ABCFRALMMACC1.TradeConfirmation; { node: { type: queue }, assert: never, create: never, mode: consume }");

    Duration timeout = Duration::SECOND * 60;

    int blockSize = 1000;
    int timeTick = 1000;

    int messageNo = 0;
    int blockSizeBytes = 0;
    int totalSizeBytes = 0;

    Connection connection(url, connectionOptions);
     try {
        connection.open();
        Session session = connection.createSession();

        std::cout << "Connection opened, session created" << std::endl;
        Receiver receiver = session.createReceiver(broadcast);
        std::cout << "Receiver created " << receiver.getName() << std::endl;
        receiver.setCapacity(1000);

        while (true)
        {
            try
            {
                Message msg = receiver.fetch(timeout);
                //std::cout << "Received message ... " << msg.getContent() << std::endl;
                messageNo++;
                blockSizeBytes += msg.getContentSize();
                totalSizeBytes += msg.getContentSize();
            }
            catch (NoMessageAvailable noMessage) {
                session.acknowledge(true);

                break;
            }
    
            if (messageNo % blockSize == 0)
            {
                session.acknowledge(true);
            }
    
            if (messageNo % timeTick == 0)
            {
                std::cout << "-I- " << messageNo << " messages received; " << (blockSizeBytes / 1024) << " kB"  << std::endl;
                blockSizeBytes = 0;
            }
        }
        // Close unfinished block
        std::cout << "-I- " << messageNo << " messages received; " << (blockSizeBytes / 1024) << " kB"  << std::endl;
        
        // Total stats
        std::cout << "-I- Total " << messageNo << " messages received; " << (blockSizeBytes / 1024) << " kB"  << std::endl;

        receiver.close();
        session.close();
        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
        connection.close();
    }
    return 1;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to