Hi Pieter,

Please find attached the test code.

*Test Description: *
send_process: - Gets the time before sending a message and sends it as part
of the message.
                      -  Message send continuously with a delay of 1 ms.
                      -  Uses ZMQ_PUB socket and IPC protocol.

recv_process:  -  After receiving the message, calculates the time
difference between send and receive times.
                      -  If the difference is greater than 10 ms, prints
the message number and delay info(In the actual test code I am sending to
logging application using ZMQ_PUB and TCP protocol)
                      - Uses ZMQ_SUB and IPC protocol.

*Test ran for more than 48hrs.*
*NOTE:  Message delay will be seen only after running the test for a couple
of hours. (in my case after 10 hrs)*

*Test Configuration:*
- Freescale quad core ARM processor.
- 1 GHz processor
- OS : Ubuntu 10.04

Thanks,
Divya


On Mon, Apr 15, 2013 at 12:52 PM, Pieter Hintjens <p...@imatix.com> wrote:

> Hi Asif,
>
> I'm hoping Divya will provide us with reusable tests that show the problem.
>
> -Pieter
>
>
>
>
>
> On Mon, Apr 15, 2013 at 7:50 AM, asif saeed <asif.l...@gmail.com> wrote:
> > Excellent, Divya. Very good analysis.
> >
> > @Pieter,
> >
> > I am also going to use ZeroMQ in an Ubuntu environment and Divya's
> analysis
> > is really worrying me. Does ZeroMQ have benchmarks on non real-time
> systems?
> > What kind of performance tests are there to test the performance?
> >
> > Sincerely,
> >
> > -Asif
> >
> >
> > On Fri, Apr 12, 2013 at 10:29 PM, Divya Mohan <divya.mohan...@gmail.com>
> > wrote:
> >>
> >> Hi Pieter,
> >>
> >> I tried the same performance tests with POSIX message queues and there
> was
> >> no delay in receiving the messages. The same test with ZeroMQ showed
> delays
> >> of upto 470msec. Doesn't this mean that we can rule out the possibility
> of
> >> another process taking up CPU time randomly (as you had suggested
> earlier)?
> >>
> >> Are there any sleep/wait (for semaphores) in the zeromq code that could
> >> send my process to suspeneded/sleep state. I am not able to figure out
> why
> >> this delay is seen when using ZeroMq and not with message queues.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Divya
> >>
> >>
> >>
> >>
> >>
> >> On Thu, Mar 28, 2013 at 3:54 PM, Pieter Hintjens <p...@imatix.com> wrote:
> >>>
> >>> On Wed, Mar 27, 2013 at 4:51 PM, Divya Mohan <divya.mohan...@gmail.com
> >
> >>> wrote:
> >>>
> >>> > Current and default scheduling policy is SCHED_OTHER. I had given
> >>> > highest
> >>> > priority to my receive process thinking delay due to scheduling will
> be
> >>> > taken care off but even then the messages are delayed.
> >>> >
> >>> > Also I am not using any sleep in  my receive process. Only send
> process
> >>> > has
> >>> > a sleep of 1 msec.
> >>>
> >>> In general changing thread priorities is a bad idea.
> >>>
> >>> Perhaps you have another process taking CPU time at random occasions,
> >>> causing the sender I/O thread to be swapped out.
> >>>
> >>> If you want guarantees that there are no spikes you'll have to use a
> >>> real time kernel. It will increase average latency but remove the
> >>> spikes.
> >>>
> >>> -Pieter
> >>> _______________________________________________
> >>> zeromq-dev mailing list
> >>> zeromq-dev@lists.zeromq.org
> >>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >>
> >>
> >>
> >> _______________________________________________
> >> zeromq-dev mailing list
> >> zeromq-dev@lists.zeromq.org
> >> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >>
> >
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > zeromq-dev@lists.zeromq.org
> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
#include<stdlib.h>
#include<pthread.h>
#include<string.h>
#include<stdio.h>
#include<unistd.h>
#include<sys/time.h>
#include<sys/resource.h>
#include"zmq/zmq.h"

void *process_context;
void *publish_socket;
void *sub_socket;
pthread_t thread_id[2];
typedef struct
{
  unsigned int msg_no;
  int time_in_sec;
  int time_in_usec;
}MessageDetails;
 
/*
* Recieves bcast messages from other process.
*/
void* RecvBcastMesg (void *arg)
{
  int return_code = -1;
  unsigned int missed_count = 0;
  static unsigned int last_recvd_count= 0;
  MessageDetails my_msg_details;
  struct timeval tv;
  struct timeval s_time;
  struct timezone tz;
  double recv_time;
  double send_time;
  unsigned int delay_time;
  unsigned int time_delayed;
  while(1)
 {

   memset(&my_msg_details, 0, sizeof( my_msg_details ));
   return_code = zmq_recv(sub_socket,&my_msg_details,sizeof( my_msg_details ),0); 
   gettimeofday(&tv,&tz);
   if (return_code > 0)
   {
     s_time.tv_sec = my_msg_details.time_in_sec;
     s_time.tv_usec = my_msg_details.time_in_usec;
     recv_time = (double)tv.tv_sec + ((double)tv.tv_usec)/ (1000 * 1000);
     send_time = (double)s_time.tv_sec + ((double) s_time.tv_usec) /(1000 * 1000);
     delay_time = (unsigned int) ((recv_time- send_time) * 1000 * 1000) ;

     if(delay_time > 10000) //10msec
     {
        time_delayed = delay_time - 10000;
        printf("MsgNo: %d delayed by time:%dusecs\n",my_msg_details.msg_no, time_delayed);
        //In real test, a function is called to send this info to a logging process using ZMQ PUB/SUB- TCP protocol.
     }
  }//end if
 }//end while
}

int main()
{
  int return_code = -1;
  int filter_len = 0;
  char* filter_value = NULL;
  process_context = zmq_ctx_new();

  if (NULL == process_context)
  {
    printf("Context Creation Error\n");
  }

  sub_socket = zmq_socket(process_context, ZMQ_SUB);
  if (NULL == sub_socket)
  {
    return_code= zmq_errno();
    printf("Socket Open error:%d:%s\n",return_code,zmq_strerror(return_code));
  }

  //subscriber socket open check
  return_code= zmq_connect(sub_socket,"ipc://BcastProc3.ipc");
  if (-1 == return_code)
  {
    return_code= zmq_errno();
    printf("Socket Connect error:%d:%s\n",return_code,zmq_strerror(return_code));
  }
  //subscriber socket connect check
   return_code= zmq_setsockopt(sub_socket,ZMQ_SUBSCRIBE,NULL,0);
   if (-1 == return_code)
   {
      return_code= zmq_errno();
      printf("Socket opt error:%d:%s\n",return_code,zmq_strerror(return_code));
   }

   //set socket option check
   publish_socket = zmq_socket(process_context, ZMQ_PUB);
   if (NULL == publish_socket)
  {
    return_code= zmq_errno();
    printf("Socket Open error:%d:%s\n",return_code,zmq_strerror(return_code));
  }  //publisher socket open check
 
  return_code = pthread_create(&thread_id[0],NULL,&RecvBcastMesg,NULL);
  if(0 != return_code)
  {
    printf( "Thread Creation-RecvBcastMesg: %d", return_code);
  }

  //MainProcess should wait till threads are complete
  return_code = pthread_join(thread_id[0], NULL);
  if(0 != return_code)
  { 
     printf( "Thread Join-RecvBcastMesg: %d", return_code);
  }

  return_code= zmq_close(sub_socket);;

  if (0!= return_code)
 {
    printf( "Sub-Socket close Error : %d", return_code);
 }

 return_code= zmq_close(publish_socket);;
 if (0 != return_code)
 {
   printf("PublishSocket close Error : %d", return_code);
 }

 return_code= zmq_ctx_destroy(process_context);
 if (0 != return_code)
 {
    printf( "Context deletion Error : %d", return_code);
 }

 return return_code;
}
//Sending Process
 
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include "zmq/zmq.h"

typedef struct
{
  unsigned int msg_no;
  int time_in_sec;
  int time_in_usec;
}MessageDetails;

unsigned int send_count =0;
void *process_context;
void *pub_socket;

static int SendMessage( void )
{
  int return_code = -1;
  int retry_count = 0;
  MessageDetails my_msg_details;
  struct timezone tz;
  struct timeval send_time;
  static unsigned int send_count = 1;
 
  memset(&my_msg_details, 0 , sizeof(my_msg_details) );
  my_msg_details.msg_no = send_count++;
  usleep(1000); //1msec
  gettimeofday(&send_time,&tz);
  my_msg_details.time_in_sec = send_time.tv_sec;
  my_msg_details.time_in_usec = send_time.tv_usec;
  
  return_code = zmq_send( pub_socket,(unsigned char *)&my_msg_details,sizeof(my_msg_details),0);
  if (return_code < 0)
  {
     printf("Send Error\n");
  }

  return( return_code );
}
 
int main()
{
  int return_code = -1;
  process_context = zmq_ctx_new();

  if (NULL == process_context)
  {
     printf("Context Creation Error\n");
  }
  pub_socket = zmq_socket(process_context, ZMQ_PUB);

  if (NULL == pub_socket)
  {
     return_code= zmq_errno();
     printf("Socket Open error:%d:%s\n",return_code,zmq_strerror(return_code));
  }

 //publisher socket open check
 return_code= zmq_bind(pub_socket,"ipc://BcastProc3.ipc");
 if (-1 == return_code)
 {
   return_code= zmq_errno();
   printf("Socket Bind error:%d:%s\n",return_code,zmq_strerror(return_code));
 }

 //publisher socket bind check
 sleep(1);
 while ( 1 )
 {
    SendMessage();
 }

 return_code= zmq_close(pub_socket);;
 if (0 != return_code)
 {
   printf("PublishSocket close Error : %d", return_code);
 }

 return_code= zmq_ctx_destroy(process_context);
 if (0 != return_code)
 {
    printf( "Context deletion Error : %d", return_code);
 }

}

_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to