Not only PUSH/PULL can be a usefull pattern, but also PUB/SUB. Although
PUSH/PULL makes perfectly sense for the default.

Thanks,
Christian
---
 README.markdown     |    2 ++
 imzeromq/imzeromq.c |   26 +++++++++++++++++++++++++-
 omzeromq/omzeromq.c |   21 ++++++++++++++++++++-
 3 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/README.markdown b/README.markdown
index 6a1dbfa..75c4bf8 100644
--- a/README.markdown
+++ b/README.markdown
@@ -50,6 +50,7 @@ The :omzeromq: selector takes the following parameter 
components:
 * hwm=<NNN>             Sets the high water mark of the socket.
 * swap=<NNN>            Sets the swap value for the socket.
 * threads=<N>           Sets the number of zeromq context threads.
+* pattern=<patternstr>  Sets the messaging pattern (push or pub).
 
 The format for the selector may be specified in the standard way with
 a trailing ";<FORMAT>" specifier.
@@ -71,6 +72,7 @@ The $InputZeroMQServerRun directive takes the following 
parameter components:
 * connect=<endpoint>    Connect to the specified endpoint.
 * bind=<endpoint>       Bind to the specified endpoint.
 * identity=<identstr>   Sets the identity of the socket.
+* pattern=<patternstr>  Sets the messaging pattern (pull or sub).
 
 Examples:
 
diff --git a/imzeromq/imzeromq.c b/imzeromq/imzeromq.c
index 783559e..f275473 100644
--- a/imzeromq/imzeromq.c
+++ b/imzeromq/imzeromq.c
@@ -107,6 +107,8 @@ static rsRetVal add_endpoint(void __attribute__((unused)) * 
oldp, uchar * valp)
     char * connstr = NULL;
     char * bindstr = NULL;
     char * identstr = NULL;
+    char * patternstr = NULL;
+    int pattern = ZMQ_PULL;
 
     char * ptr1;
     char * binding;
@@ -142,6 +144,10 @@ static rsRetVal add_endpoint(void __attribute__((unused)) 
* oldp, uchar * valp)
         {
             CHKmalloc(identstr = strdup(val));
         }
+        else if (strcmp(binding, "pattern") == 0)
+        {
+            CHKmalloc(patternstr = strdup(val));
+        }
         else
         {
             errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
@@ -167,6 +173,24 @@ static rsRetVal add_endpoint(void __attribute__((unused)) 
* oldp, uchar * valp)
                ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
     }
 
+    // check for valid patterns (pull is default)
+    if (patternstr) {
+        if (strcmp(binding, "pull") == 0) {
+            pattern = ZMQ_PULL;
+        }
+        else if (strcmp(binding, "sub") == 0)
+        {
+            pattern = ZMQ_SUB;
+        }
+        else
+       {
+            errmsg.LogError(0,
+                RS_RET_INVALID_PARAMS, "error: "
+                "invalid messaging pattern - use 'pull' or 'sub'");
+            ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+       }
+    }
+
     if (!s_context)
         s_context = zmq_init(1);
     if (!s_context)
@@ -178,7 +202,7 @@ static rsRetVal add_endpoint(void __attribute__((unused)) * 
oldp, uchar * valp)
                ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
     }
 
-    void * sock = zmq_socket(s_context, ZMQ_PULL);
+    void * sock = zmq_socket(s_context, pattern);
     if (!sock)
     {
                errmsg.LogError(0,
diff --git a/omzeromq/omzeromq.c b/omzeromq/omzeromq.c
index 55865a6..81f8dca 100644
--- a/omzeromq/omzeromq.c
+++ b/omzeromq/omzeromq.c
@@ -60,6 +60,7 @@ typedef struct _instanceData {
     int64              swapsz;
     uchar *            identstr;
     int64              threads;
+    int                        pattern;
 
     void *             context;
     void *             socket;
@@ -101,7 +102,7 @@ static rsRetVal init_zeromq(instanceData *pData, int 
bSilent)
        ASSERT(pData->socket == NULL);
 
     pData->context = zmq_init(pData->threads);
-    pData->socket = zmq_socket(pData->context, ZMQ_PUSH);
+    pData->socket = zmq_socket(pData->context, pData->pattern);
 
     if (pData->connstr)
         zmq_bind(pData->socket, (char *) pData->connstr);
@@ -174,6 +175,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
 
     pData->connstr = NULL;
     pData->bindstr = NULL;
+    pData->pattern = ZMQ_PUSH;
     pData->hwmsz = -1;
        pData->swapsz = -1;
     pData->identstr = NULL;
@@ -242,6 +244,23 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
                 ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
             }
         }
+        else if (strcmp(binding, "pattern") == 0)
+        {
+           if (strcmp(val, "push") == 0) {
+                pData->pattern = ZMQ_PUSH;
+            }
+            else if (strcmp(val, "pub") == 0)
+            {
+                pData->pattern = ZMQ_SUB;
+            }
+           else
+            {
+                errmsg.LogError(0,
+                    RS_RET_INVALID_PARAMS,
+                   "error: invalid messaging pattern - use 'push' or 'pub'");
+                ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+            }
+        }
         else
         {
             errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
-- 
1.7.1

_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com

Reply via email to