One thing I did notice about the Unix message queue implementation on z/OS is that it's not the fastest as a message bus between threads using IPC_PRIVATE. I was quite shocked at the difference when compared to a C++ BlockingQueue class I nabbed from boost. Exactly the same throughput. The boost implementation just uses pthread condition variables which under the covers use USS latches

Unix message queue: 12976 messages per/sec
Boost message queue: 107417 messages per/sec

107417 / 12976 = 8.27

That's over 8 times faster! I'm wondering if it's because in the C++ code the data is in userspace and in the Unix message queue it's data spaces managed by the kernel. The Unix message queue is serialized using PLO instructions
so I expected it to be much faster.

Here's the code. Maybe I've done something wrong.

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/modes.h>
#include <time.h>
#include <stdio.h>
#include <signal.h>
#include <time.h>
#include <unistd.h>

#pragma runopts(posix(on))

volatile int quit = 0;

void catcher( int signum )
{
    quit = 1;
}

struct my_msgbuf
{
    long mtype;
    char mtext[1024];
};

void * consumer( void * p )
{
    int msqid = ( int )p;
    for (;;)
    {
        struct my_msgbuf buf = { 0 };
        if ( msgrcv( msqid, &buf, sizeof(buf), 0, 0 ) == 0 )
        {
            perror( "msgrcv" );
            exit( 1 );
        }
    }
    return NULL;
}

int main( int argc, char ** argv )
{
    struct my_msgbuf buf = { 0 };
    int msqid;

    msqid = msgget( IPC_PRIVATE, S_IWUSR | S_IRUSR | IPC_PLO1 );
    if ( msqid == 0 )
    {
        perror( "msgget" );
        exit( 8 );
    }

    struct msqid_ds msgqds;
    msgctl( msqid, IPC_STAT, &msgqds );

    buf.mtype = 1; // we don't really care in this case

    // create a signal handler to catch the alarm
    struct sigaction sact;
    sigemptyset( &sact.sa_mask );
    sact.sa_flags = 0;
    sact.sa_handler = catcher;
    sigaction( SIGALRM, &sact, NULL );

    // create the threads
    size_t threads = 1; // only use one thrad in this test
    pthread_t tid[threads];
    for ( int i = 0; i < threads; i++ )
    {
        int rc = pthread_create( &tid[i], NULL, consumer, (void * )msqid );
        if ( rc != 0 )
        {
            perror( "pthread_create" );
            exit( 8 );
        }
    }

    // timer will pop in 1 seconds
    alarm( 1 );

    unsigned int n = 0;
    while ( !quit && n < 500000 )
    {
if ( msgsnd( msqid, (struct msgbuf * )&buf, sizeof(buf), 0 ) == -1 )
        {
            perror( "msgsnd" );
            exit( 8 );
        }
        n++;
    }

    printf( "MSGQ: %d messages per sec\n", n );

    return 0;
}

#include <iostream>
#include <vector>
#include <boost/utility.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/thread.hpp>
#include <boost/timer.hpp>
#include <boost/bind.hpp>

#include <signal.h>

#pragma runopts(posix(on))

/**
 * Blocking queue.
 *
 * A bounded circular buffer that uses a monitor to synchronize
 * access to the queue.
 */
template <typename T>
class BlockingQueue : private boost::noncopyable {
public:
  typedef boost::mutex::scoped_lock scoped_lock;

  /**
   * Contructs a blocking queue of n elements.
   *
   * @param n the size of the queue
   */
  BlockingQueue(int n )
    : m_begin(0),
      m_end(0),
      m_buffered(0),
      m_circularBuffer(n)
  {
  }

  /**
   * Puts an item onto the queue. If the queue is full, the thread is
   * blocked until notified by the receiver that the queue is ready
   * for data.
   *
   * @param item the element to add to the queue
   */
  void send( T item )
  {
    scoped_lock lock( m_monitor );

    // block until the queue is not full
    while ( m_buffered == m_circularBuffer.size() ) {
      m_bufferNotFull.wait( lock );
    }

    // put the item at the end of the queue
    m_circularBuffer[m_end] = item;

    // calculate the end-of-queue index
    m_end = (m_end + 1) % m_circularBuffer.size();

    ++m_buffered;

    // notify the receiver
    m_bufferNotEmpty.notify_one();
  }

  /**
   * Returns the item at the top of the queue. If the queue is empty
   * the thread is blocked until notifed by the sender that the queue
   * has data.
   *
   * @return the item at the top of the queue.
   */
  T receive()
  {
    scoped_lock lock( m_monitor );

    // block until the queue is not empty
    while ( m_buffered == 0 ) {
      m_bufferNotEmpty.wait( lock );
    }

    // get the element at the top of the queue
    T item = m_circularBuffer[m_begin];

    // calculate the top of queue index
    m_begin = (m_begin + 1) % m_circularBuffer.size();

    --m_buffered;

    // notify the sender
    m_bufferNotFull.notify_one();

    return item;
  }

private:
  /** the container for the circular buffer */
  std::vector<T> m_circularBuffer;

  /** The index to the top of the queue */
  int m_begin;

  /** the index to the bottom of the queue  */
  int m_end;

  /** the number of items in the queue */
  int m_buffered;

  /** the monitor mutex */
  boost::mutex m_monitor;

  /** the condition varaible to notify buffer not full conditions */
  boost::condition m_bufferNotFull;

  /** the condition variable to notify buffer not empty  conditions */
  boost::condition m_bufferNotEmpty;

};

/** a character buffer with a type to simulate System V queues */
struct Buffer
{
  int type;
  char data[1024];
};

static BlockingQueue<Buffer> buf(500);

/** the consumer thread */
void receiver() {
  boost::timer t;
  Buffer n;
  do {
    n = buf.receive();
    if (n.type == -1) break;
  } while ( true ); // -1 indicates end of buffer
}

using namespace std;

volatile int quit = 0;

extern "C" void catcher(int signum) {
  quit = 1;
}

int main(int argc, char** argv)
{
  boost::thread thrd2( &receiver );

  // set an alarm to pop in 1 second to benchmark how many messages
  // per second the blocking queue can process
  struct sigaction sact;
  sigemptyset(&sact.sa_mask);
  sact.sa_flags = 0;
  sact.sa_handler = catcher;
  sigaction(SIGALRM, &sact, NULL);
  alarm(1);

  // pump some data into the queue until the timer is popped or finished
  size_t n = 0;
  while ( !quit && n < 500000 ) {
    Buffer b = {0};
    buf.send(b);
    ++n;
  }

  printf( "%d messages a sec\n", n );

  return 0;
}





On 30/03/2016 8:22 PM, John McKown wrote:
On Tue, Mar 29, 2016 at 8:44 PM, David Crayford <dcrayf...@gmail.com> wrote:

On 30/03/2016 9:15 AM, John McKown wrote:

On Tue, Mar 29, 2016 at 6:36 PM, David Crayford <dcrayf...@gmail.com>
wrote:

On 29 Mar 2016, at 11:59 PM, Paul Gilmartin <
0000000433f07816-dmarc-requ...@listserv.ua.edu> wrote:

On Tue, 29 Mar 2016 11:30:02 -0400, Scott Ford wrote:
Isnt iBM's Unix System Services based on Posix ?

Relentlessly, but an outdated POSIX.  No "cd -P" nor "pwd -L" e.g.

If only the message queues were POSIX and not those horrible System V!

​I gotta ask. Why would it make any difference for the same
functionality,
message queues, to be POSIX instead of SYSV? Unless you've got some sort
of
"POSIX only" rule from some manager.​


Simpler, easier to use API. No need to create a file system object and
ftok a token, you can just use a namespace with mq_open() and features that
don't exist
in System V such as mq_notify() spring to mind. In in nutshell it's a
better design. Having said that the System V message queues are better than
nothing and you
don't have to be authorized to use them which is goodness.

​An. I see what you're getting at. It wasn't, as I had thought, that
message queues are bad because they are SYSV, but that IBM should have
implemented the POSIX version of message queues instead of the SYSV
version. ​Too much gaming on my part lately. It rots the brain.


----------------------------------------------------------------------
For IBM-MAIN subscribe / signoff / archive access instructions,
send email to lists...@listserv.ua.edu with the message: INFO IBM-MAIN

Reply via email to