Not only PUSH/PULL can be a usefull pattern, but also PUB/SUB. Although
PUSH/PULL makes perfectly sense for the default.
Thanks,
Christian
---
README.markdown | 2 ++
imzeromq/imzeromq.c | 26 +++++++++++++++++++++++++-
omzeromq/omzeromq.c | 21 ++++++++++++++++++++-
3 files changed, 47 insertions(+), 2 deletions(-)
diff --git a/README.markdown b/README.markdown
index 6a1dbfa..75c4bf8 100644
--- a/README.markdown
+++ b/README.markdown
@@ -50,6 +50,7 @@ The :omzeromq: selector takes the following parameter
components:
* hwm=<NNN> Sets the high water mark of the socket.
* swap=<NNN> Sets the swap value for the socket.
* threads=<N> Sets the number of zeromq context threads.
+* pattern=<patternstr> Sets the messaging pattern (push or pub).
The format for the selector may be specified in the standard way with
a trailing ";<FORMAT>" specifier.
@@ -71,6 +72,7 @@ The $InputZeroMQServerRun directive takes the following
parameter components:
* connect=<endpoint> Connect to the specified endpoint.
* bind=<endpoint> Bind to the specified endpoint.
* identity=<identstr> Sets the identity of the socket.
+* pattern=<patternstr> Sets the messaging pattern (pull or sub).
Examples:
diff --git a/imzeromq/imzeromq.c b/imzeromq/imzeromq.c
index 783559e..f275473 100644
--- a/imzeromq/imzeromq.c
+++ b/imzeromq/imzeromq.c
@@ -107,6 +107,8 @@ static rsRetVal add_endpoint(void __attribute__((unused)) *
oldp, uchar * valp)
char * connstr = NULL;
char * bindstr = NULL;
char * identstr = NULL;
+ char * patternstr = NULL;
+ int pattern = ZMQ_PULL;
char * ptr1;
char * binding;
@@ -142,6 +144,10 @@ static rsRetVal add_endpoint(void __attribute__((unused))
* oldp, uchar * valp)
{
CHKmalloc(identstr = strdup(val));
}
+ else if (strcmp(binding, "pattern") == 0)
+ {
+ CHKmalloc(patternstr = strdup(val));
+ }
else
{
errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
@@ -167,6 +173,24 @@ static rsRetVal add_endpoint(void __attribute__((unused))
* oldp, uchar * valp)
ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
}
+ // check for valid patterns (pull is default)
+ if (patternstr) {
+ if (strcmp(binding, "pull") == 0) {
+ pattern = ZMQ_PULL;
+ }
+ else if (strcmp(binding, "sub") == 0)
+ {
+ pattern = ZMQ_SUB;
+ }
+ else
+ {
+ errmsg.LogError(0,
+ RS_RET_INVALID_PARAMS, "error: "
+ "invalid messaging pattern - use 'pull' or 'sub'");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+ }
+
if (!s_context)
s_context = zmq_init(1);
if (!s_context)
@@ -178,7 +202,7 @@ static rsRetVal add_endpoint(void __attribute__((unused)) *
oldp, uchar * valp)
ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
}
- void * sock = zmq_socket(s_context, ZMQ_PULL);
+ void * sock = zmq_socket(s_context, pattern);
if (!sock)
{
errmsg.LogError(0,
diff --git a/omzeromq/omzeromq.c b/omzeromq/omzeromq.c
index 55865a6..81f8dca 100644
--- a/omzeromq/omzeromq.c
+++ b/omzeromq/omzeromq.c
@@ -60,6 +60,7 @@ typedef struct _instanceData {
int64 swapsz;
uchar * identstr;
int64 threads;
+ int pattern;
void * context;
void * socket;
@@ -101,7 +102,7 @@ static rsRetVal init_zeromq(instanceData *pData, int
bSilent)
ASSERT(pData->socket == NULL);
pData->context = zmq_init(pData->threads);
- pData->socket = zmq_socket(pData->context, ZMQ_PUSH);
+ pData->socket = zmq_socket(pData->context, pData->pattern);
if (pData->connstr)
zmq_bind(pData->socket, (char *) pData->connstr);
@@ -174,6 +175,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
pData->connstr = NULL;
pData->bindstr = NULL;
+ pData->pattern = ZMQ_PUSH;
pData->hwmsz = -1;
pData->swapsz = -1;
pData->identstr = NULL;
@@ -242,6 +244,23 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
}
}
+ else if (strcmp(binding, "pattern") == 0)
+ {
+ if (strcmp(val, "push") == 0) {
+ pData->pattern = ZMQ_PUSH;
+ }
+ else if (strcmp(val, "pub") == 0)
+ {
+ pData->pattern = ZMQ_SUB;
+ }
+ else
+ {
+ errmsg.LogError(0,
+ RS_RET_INVALID_PARAMS,
+ "error: invalid messaging pattern - use 'push' or 'pub'");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+ }
else
{
errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
--
1.7.1
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com