Hi Martin/ Gonzalo/ Brian
As part of the discussion on the list to provide a c api zmq_device so
that you could do this :-
int main()
{
void * ctx = zmq_init(1,1,0);
void *ear = zmq_socket (ctx, ZMQ_SUB);
zmq_bind (ear, "tcp://lo:5001");
zmq_setsockopt (ear, ZMQ_SUBSCRIBE, "forex.", 6);
void *mouth = zmq_socket (ctx, ZMQ_PUB);
zmq_connect (ear, "tcp://127.0.0.1:5001");
zmq_device (ZMQ_FORWARDER, ear, mouth);
}
Please find attached a first cut of this.
There is also a C++ class called device_t which can be used in the C++
space.
It should apply cleanly to the trunk, but needs a few formatting edits
to get it into zmq style.
(I'm also considering renaming some of the classes as they are a bit long)
(Incidently does any one have a zmq-c-style for emacs?)
It needs testing, so if anyone has a chance, please do.
I am posting it up now so as to get it reviewed.
Thanks
Jon
>From ff1a0355745648a01af45721d04e96b6c0915b9b Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Wed, 31 Mar 2010 01:05:01 +0100
Subject: [PATCH 1/7] initial commit of devices
---
include/zmq.h | 9 ++++
include/zmq.hpp | 22 ++++++++-
src/Makefile.am | 8 +++
src/device_base.cpp | 29 +++++++++++
src/device_base.hpp | 47 ++++++++++++++++++
src/forwarder_device.cpp | 2 +
src/forwarder_device.hpp | 29 +++++++++++
src/queue_device.cpp | 21 ++++++++
src/queue_device.hpp | 117 ++++++++++++++++++++++++++++++++++++++++++++++
src/streamer_device.cpp | 1 +
src/streamer_device.hpp | 18 +++++++
src/zmq.cpp | 11 ++++-
12 files changed, 312 insertions(+), 2 deletions(-)
create mode 100644 src/device_base.cpp
create mode 100644 src/device_base.hpp
create mode 100644 src/forwarder_device.cpp
create mode 100644 src/forwarder_device.hpp
create mode 100644 src/queue_device.cpp
create mode 100644 src/queue_device.hpp
create mode 100644 src/streamer_device.cpp
create mode 100644 src/streamer_device.hpp
diff --git a/include/zmq.h b/include/zmq.h
index a1fcf31..38560a8 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -225,6 +225,15 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
ZMQ_EXPORT int zmq_errno ();
////////////////////////////////////////////////////////////////////////////////
+// Devices - Experimental
+////////////////////////////////////////////////////////////////////////////////
+
+#define ZMQ_STREAMER 0
+#define ZMQ_FORWARDER 1
+#define ZMQ_QUEUE 2
+ZMQ_EXPORT void zmq_device(int device, void * insocket, void* outsocket);
+
+////////////////////////////////////////////////////////////////////////////////
// Helper functions.
////////////////////////////////////////////////////////////////////////////////
diff --git a/include/zmq.hpp b/include/zmq.hpp
index 6228133..f452152 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -59,7 +59,8 @@ namespace zmq
class message_t : private zmq_msg_t
{
friend class socket_t;
-
+ friend class queue_device;
+ friend class forwarder_device;
public:
inline message_t ()
@@ -254,6 +255,25 @@ namespace zmq
void operator = (const socket_t&);
};
+
+ class device_t
+ {
+ public:
+ inline device_t(socket_t & in_, socket_t & out_, int _type)
+ : type(_type), in(in_), out(out_)
+ {}
+ inline void run()
+ {
+ zmq_device(type, in, out);
+ }
+ private:
+ int type;
+ socket_t & in;
+ socket_t & out;
+ device_t (const device_t&);
+ void operator = (const device_t&);
+ };
+
}
#endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 26106d8..517fabe 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -57,6 +57,7 @@ libzmq_la_SOURCES = app_thread.hpp \
command.hpp \
config.hpp \
decoder.hpp \
+ device_base.hpp \
devpoll.hpp \
dispatcher.hpp \
downstream.hpp \
@@ -92,6 +93,9 @@ libzmq_la_SOURCES = app_thread.hpp \
p2p.hpp \
prefix_tree.hpp \
pub.hpp \
+ queue_device.hpp \
+ forwarder_device.hpp \
+ streamer_device.hpp \
rep.hpp \
req.hpp \
select.hpp \
@@ -123,6 +127,7 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_listener.hpp \
app_thread.cpp \
command.cpp \
+ device_base.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
@@ -146,6 +151,9 @@ libzmq_la_SOURCES = app_thread.hpp \
pipe.cpp \
poll.cpp \
pub.cpp \
+ queue_device.cpp \
+ forwarder_device.cpp \
+ streamer_device.cpp \
rep.cpp \
req.cpp \
select.cpp \
diff --git a/src/device_base.cpp b/src/device_base.cpp
new file mode 100644
index 0000000..be16e41
--- /dev/null
+++ b/src/device_base.cpp
@@ -0,0 +1,29 @@
+#include <new>
+
+#include "../include/zmq.h"
+
+
+#include "device_base.hpp"
+#include "queue_device.hpp"
+#include "forwarder_device.hpp"
+
+namespace zmq
+{
+
+ device_base_t * create_device(int dtype_,
+ socket_base_t* in_,
+ socket_base_t* out_)
+ {
+ if ( dtype_ == ZMQ_FORWARDER) {
+ return new (std::nothrow) forwarder_device(in_,out_);
+ }
+ else if ( dtype_ == ZMQ_STREAMER ) {
+ return NULL;
+ }
+ else if ( dtype_ == ZMQ_QUEUE ) {
+ return new (std::nothrow) queue_device(in_,out_);
+ }
+ return NULL;
+ }
+
+}
diff --git a/src/device_base.hpp b/src/device_base.hpp
new file mode 100644
index 0000000..c97149e
--- /dev/null
+++ b/src/device_base.hpp
@@ -0,0 +1,47 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DEVICE_BASE_HPP_INCLUDED__
+#define __ZMQ_DEVICE_BASE_HPP_INCLUDED__
+
+namespace zmq
+{
+ class device_base_t
+ {
+ public:
+ inline device_base_t( class socket_base_t * insocket_,
+ class socket_base_t * outsocket_)
+ : insocket(insocket_)
+ , outsocket(outsocket_)
+ {}
+
+ virtual void run() = 0;
+ protected:
+ class socket_base_t * insocket;
+ class socket_base_t * outsocket;
+ private:
+ device_base_t (const device_base_t &);
+ void operator = (const device_base_t &);
+ };
+
+ device_base_t* create_device(int type_, socket_base_t* in_, socket_base_t* out_);
+
+}
+
+#endif
diff --git a/src/forwarder_device.cpp b/src/forwarder_device.cpp
new file mode 100644
index 0000000..8aac9ea
--- /dev/null
+++ b/src/forwarder_device.cpp
@@ -0,0 +1,2 @@
+#include "forwarder_device.hpp"
+
diff --git a/src/forwarder_device.hpp b/src/forwarder_device.hpp
new file mode 100644
index 0000000..219b9d4
--- /dev/null
+++ b/src/forwarder_device.hpp
@@ -0,0 +1,29 @@
+#include "../include/zmq.hpp"
+#include "device_base.hpp"
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+ class forwarder_device : public device_base_t
+ {
+ public:
+ forwarder_device(socket_base_t * in_, socket_base_t * out_)
+ : device_base_t(in_,out_)
+ {}
+
+ void run()
+ {
+ zmq::message_t msg;
+ while (true) {
+ insocket->recv (&msg,0);
+ outsocket->send (&msg,0);
+ }
+ }
+
+ private:
+
+ };
+
+
+}
diff --git a/src/queue_device.cpp b/src/queue_device.cpp
new file mode 100644
index 0000000..5b77aa4
--- /dev/null
+++ b/src/queue_device.cpp
@@ -0,0 +1,21 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "queue_device.hpp"
+
diff --git a/src/queue_device.hpp b/src/queue_device.hpp
new file mode 100644
index 0000000..b730c09
--- /dev/null
+++ b/src/queue_device.hpp
@@ -0,0 +1,117 @@
+
+#ifndef __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
+#define __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
+
+#include "../include/zmq.hpp"
+#include "device_base.hpp"
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+ class queue_device : public device_base_t
+ {
+ public:
+
+ queue_device (socket_base_t* reply, socket_base_t* request)
+ : device_base_t(reply,request)
+ {
+ items [0].socket = reply;
+ items [0].fd = 0;
+ items [0].events = ZMQ_POLLIN;
+ items [0].revents = 0;
+
+ items [1].socket = request;
+ items [1].fd = 0;
+ items [1].events = ZMQ_POLLIN;
+ items [1].revents = 0;
+
+ m_next_request_method = &queue_device::get_request;
+ m_next_response_method = &queue_device::get_response;
+ }
+
+ void run()
+ {
+ while (true) {
+ int rc = zmq::poll (&items [0], 2, -1);
+ if (rc < 0)
+ break;
+ next_request();
+ next_response();
+ }
+ }
+
+ private:
+
+ void next_request()
+ {
+ (this->*m_next_request_method) ();
+ }
+
+ void next_response()
+ {
+ (this->*m_next_response_method) ();
+ }
+
+ void get_request()
+ {
+ if (items [0].revents & ZMQ_POLLIN ) {
+ int rc = insocket->recv (&request_msg, ZMQ_NOBLOCK);
+ if (!rc)
+ return;
+ items [0].events &= ~ZMQ_POLLIN;
+ items [1].events |= ZMQ_POLLOUT;
+ m_next_request_method = &queue_device::send_request;
+ }
+ }
+
+ void send_request()
+ {
+ if (items [1].revents & ZMQ_POLLOUT) {
+ int rc = outsocket->send (&request_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items [1].events &= ~ZMQ_POLLOUT;
+ items [0].events |= ZMQ_POLLIN;
+ m_next_request_method = &queue_device::get_request;
+ }
+ }
+
+ void get_response()
+ {
+ if ( items [1].revents & ZMQ_POLLIN ) {
+ int rc = outsocket->recv (&response_msg, ZMQ_NOBLOCK);
+ if (!rc)
+ return;
+ items [1].events &= ~ZMQ_POLLIN;
+ items [0].events |= ZMQ_POLLOUT;
+ m_next_response_method = &queue_device::send_response;
+ }
+ }
+void send_response()
+ {
+ if (items [0].revents & ZMQ_POLLOUT) {
+ int rc = insocket->send (&response_msg, ZMQ_NOBLOCK);
+ if (!rc)
+ return;
+ items [0].events &= ~ZMQ_POLLOUT;
+ items [1].events |= ZMQ_POLLIN;
+ m_next_response_method = &queue_device::get_response;
+ }
+ }
+
+ zmq_pollitem_t items [2];
+ zmq::message_t request_msg;
+ zmq::message_t response_msg;
+
+ typedef void (queue_device::*next_method) ();
+
+ next_method m_next_request_method;
+ next_method m_next_response_method;
+
+ queue_device (queue_device const &);
+ void operator = (queue_device const &);
+ };
+
+}
+
+#endif
diff --git a/src/streamer_device.cpp b/src/streamer_device.cpp
new file mode 100644
index 0000000..b2bc881
--- /dev/null
+++ b/src/streamer_device.cpp
@@ -0,0 +1 @@
+#include "streamer_device.hpp"
diff --git a/src/streamer_device.hpp b/src/streamer_device.hpp
new file mode 100644
index 0000000..2d1bbff
--- /dev/null
+++ b/src/streamer_device.hpp
@@ -0,0 +1,18 @@
+#include "device_base.hpp"
+
+namespace zmq
+{
+
+ class streamer_device : public device_base_t
+ {
+ public:
+ streamer_device(socket_base_t * in_, socket_base_t * out_)
+ : device_base_t(in_,out_)
+ {}
+ void run() {}
+ private:
+
+ };
+
+
+}
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 14898d5..91a3a3a 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -23,7 +23,7 @@
#include <errno.h>
#include <stdlib.h>
#include <new>
-
+#include "device_base.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "dispatcher.hpp"
@@ -685,3 +685,12 @@ unsigned long zmq_stopwatch_stop (void *watch_)
free (watch_);
return (unsigned long) (end - start);
}
+
+void zmq_device(int device_type_, void* in_, void* out_)
+{
+
+ zmq::device_base_t * d = create_device(device_type_,
+ (zmq::socket_base_t*)in_,
+ (zmq::socket_base_t*)out_);
+ d->run();
+}
--
1.6.3.3
>From 8cc9da3795f118315db4b4747bb7c556930546b1 Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 21:05:03 +0100
Subject: [PATCH 2/7] removed use of zmq::message_t in devices, replaced with zmq_msg_t
---
include/zmq.hpp | 2 --
src/device_base.cpp | 11 ++++++-----
src/forwarder_device.hpp | 2 +-
src/queue_device.hpp | 19 ++++++++++---------
src/streamer_device.cpp | 20 ++++++++++++++++++++
src/streamer_device.hpp | 14 ++++++++++++--
6 files changed, 49 insertions(+), 19 deletions(-)
diff --git a/include/zmq.hpp b/include/zmq.hpp
index f452152..53e37cf 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -59,8 +59,6 @@ namespace zmq
class message_t : private zmq_msg_t
{
friend class socket_t;
- friend class queue_device;
- friend class forwarder_device;
public:
inline message_t ()
diff --git a/src/device_base.cpp b/src/device_base.cpp
index be16e41..bfbe580 100644
--- a/src/device_base.cpp
+++ b/src/device_base.cpp
@@ -6,23 +6,24 @@
#include "device_base.hpp"
#include "queue_device.hpp"
#include "forwarder_device.hpp"
+#include "streamer_device.hpp"
namespace zmq
{
- device_base_t * create_device(int dtype_,
- socket_base_t* in_,
+ device_base_t * create_device(int dtype_, socket_base_t* in_,
socket_base_t* out_)
{
if ( dtype_ == ZMQ_FORWARDER) {
- return new (std::nothrow) forwarder_device(in_,out_);
+ return new (std::nothrow) forwarder_device(in_, out_);
}
else if ( dtype_ == ZMQ_STREAMER ) {
- return NULL;
+ return new (std::nothrow) streamer_device(in_, out_);
}
else if ( dtype_ == ZMQ_QUEUE ) {
- return new (std::nothrow) queue_device(in_,out_);
+ return new (std::nothrow) queue_device(in_, out_);
}
+ // FIXME errno?
return NULL;
}
diff --git a/src/forwarder_device.hpp b/src/forwarder_device.hpp
index 219b9d4..75f75e8 100644
--- a/src/forwarder_device.hpp
+++ b/src/forwarder_device.hpp
@@ -14,7 +14,7 @@ namespace zmq
void run()
{
- zmq::message_t msg;
+ zmq_msg_t msg;
while (true) {
insocket->recv (&msg,0);
outsocket->send (&msg,0);
diff --git a/src/queue_device.hpp b/src/queue_device.hpp
index b730c09..a1fd3aa 100644
--- a/src/queue_device.hpp
+++ b/src/queue_device.hpp
@@ -30,7 +30,7 @@ namespace zmq
m_next_response_method = &queue_device::get_response;
}
- void run()
+ inline void run()
{
while (true) {
int rc = zmq::poll (&items [0], 2, -1);
@@ -43,17 +43,17 @@ namespace zmq
private:
- void next_request()
+ inline void next_request()
{
(this->*m_next_request_method) ();
}
- void next_response()
+ inline void next_response()
{
(this->*m_next_response_method) ();
}
- void get_request()
+ inline void get_request()
{
if (items [0].revents & ZMQ_POLLIN ) {
int rc = insocket->recv (&request_msg, ZMQ_NOBLOCK);
@@ -65,7 +65,7 @@ namespace zmq
}
}
- void send_request()
+ inline void send_request()
{
if (items [1].revents & ZMQ_POLLOUT) {
int rc = outsocket->send (&request_msg, ZMQ_NOBLOCK);
@@ -76,7 +76,7 @@ namespace zmq
}
}
- void get_response()
+ inline void get_response()
{
if ( items [1].revents & ZMQ_POLLIN ) {
int rc = outsocket->recv (&response_msg, ZMQ_NOBLOCK);
@@ -87,7 +87,8 @@ namespace zmq
m_next_response_method = &queue_device::send_response;
}
}
-void send_response()
+
+ inline void send_response()
{
if (items [0].revents & ZMQ_POLLOUT) {
int rc = insocket->send (&response_msg, ZMQ_NOBLOCK);
@@ -100,8 +101,8 @@ void send_response()
}
zmq_pollitem_t items [2];
- zmq::message_t request_msg;
- zmq::message_t response_msg;
+ zmq_msg_t request_msg;
+ zmq_msg_t response_msg;
typedef void (queue_device::*next_method) ();
diff --git a/src/streamer_device.cpp b/src/streamer_device.cpp
index b2bc881..1a375fb 100644
--- a/src/streamer_device.cpp
+++ b/src/streamer_device.cpp
@@ -1 +1,21 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
#include "streamer_device.hpp"
+
diff --git a/src/streamer_device.hpp b/src/streamer_device.hpp
index 2d1bbff..44d6415 100644
--- a/src/streamer_device.hpp
+++ b/src/streamer_device.hpp
@@ -1,4 +1,6 @@
+#include "../include/zmq.hpp"
#include "device_base.hpp"
+#include "socket_base.hpp"
namespace zmq
{
@@ -9,9 +11,17 @@ namespace zmq
streamer_device(socket_base_t * in_, socket_base_t * out_)
: device_base_t(in_,out_)
{}
- void run() {}
+ void run()
+ {
+ zmq_msg_t msg;
+ while (true) {
+ insocket->recv (&msg,0);
+ outsocket->send (&msg,0);
+ }
+ }
private:
-
+ streamer_device(streamer_device const &);
+ void operator = (streamer_device const &);
};
--
1.6.3.3
>From 2459e4bea8c2c4c9f13dec651d0eec54f47d29d4 Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 21:50:34 +0100
Subject: [PATCH 3/7] added copyright and ifdef guards to device header file
---
.gitignore | 1 +
src/Makefile.am | 3 --
src/device_base.cpp | 27 +++++++++++++++++++----
src/forwarder_device.hpp | 37 +++++++++++++++++++++++++++++---
src/queue_device.hpp | 52 +++++++++++++++++++++++++++++++++++----------
src/streamer_device.hpp | 40 +++++++++++++++++++++++++++++------
6 files changed, 129 insertions(+), 31 deletions(-)
diff --git a/.gitignore b/.gitignore
index 77384d6..80a7a1c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,3 +43,4 @@ builds/msvc/*/Release
foreign/openpgm/*
!foreign/openpgm/*.tar.bz2
!foreign/openpgm/*.tar.gz
+#*#
diff --git a/src/Makefile.am b/src/Makefile.am
index 517fabe..0002cd6 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -151,9 +151,6 @@ libzmq_la_SOURCES = app_thread.hpp \
pipe.cpp \
poll.cpp \
pub.cpp \
- queue_device.cpp \
- forwarder_device.cpp \
- streamer_device.cpp \
rep.cpp \
req.cpp \
select.cpp \
diff --git a/src/device_base.cpp b/src/device_base.cpp
index bfbe580..804a519 100644
--- a/src/device_base.cpp
+++ b/src/device_base.cpp
@@ -1,8 +1,25 @@
-#include <new>
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
-#include "../include/zmq.h"
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+#include <new>
+
+#include "../include/zmq.h"
#include "device_base.hpp"
#include "queue_device.hpp"
#include "forwarder_device.hpp"
@@ -15,13 +32,13 @@ namespace zmq
socket_base_t* out_)
{
if ( dtype_ == ZMQ_FORWARDER) {
- return new (std::nothrow) forwarder_device(in_, out_);
+ return new (std::nothrow) forwarder_device_t(in_, out_);
}
else if ( dtype_ == ZMQ_STREAMER ) {
- return new (std::nothrow) streamer_device(in_, out_);
+ return new (std::nothrow) streamer_device_t(in_, out_);
}
else if ( dtype_ == ZMQ_QUEUE ) {
- return new (std::nothrow) queue_device(in_, out_);
+ return new (std::nothrow) queue_device_t(in_, out_);
}
// FIXME errno?
return NULL;
diff --git a/src/forwarder_device.hpp b/src/forwarder_device.hpp
index 75f75e8..63d9358 100644
--- a/src/forwarder_device.hpp
+++ b/src/forwarder_device.hpp
@@ -1,3 +1,25 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_FORWARDER_DEVICE_HPP_INCLUDED__
+#define __ZMQ_FORWARDER_DEVICE_HPP_INCLUDED__
+
#include "../include/zmq.hpp"
#include "device_base.hpp"
#include "socket_base.hpp"
@@ -5,25 +27,32 @@
namespace zmq
{
- class forwarder_device : public device_base_t
+ class forwarder_device_t : public device_base_t
{
public:
- forwarder_device(socket_base_t * in_, socket_base_t * out_)
+ forwarder_device_t(socket_base_t * in_, socket_base_t * out_)
: device_base_t(in_,out_)
{}
- void run()
+ virtual void run()
{
zmq_msg_t msg;
+ int rc = zmq_msg_init(&msg);
+ if (!rc)
+ return;
while (true) {
insocket->recv (&msg,0);
outsocket->send (&msg,0);
}
+ zmq_msg_close(&msg);
}
private:
-
+ forwarder_device_t(forwarder_device_t const &);
+ void operator = (forwarder_device_t const&);
};
}
+
+#endif
diff --git a/src/queue_device.hpp b/src/queue_device.hpp
index a1fd3aa..ab83b39 100644
--- a/src/queue_device.hpp
+++ b/src/queue_device.hpp
@@ -1,3 +1,21 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
#ifndef __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
#define __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
@@ -9,11 +27,11 @@
namespace zmq
{
- class queue_device : public device_base_t
+ class queue_device_t : public device_base_t
{
public:
- queue_device (socket_base_t* reply, socket_base_t* request)
+ queue_device_t (socket_base_t* reply, socket_base_t* request)
: device_base_t(reply,request)
{
items [0].socket = reply;
@@ -26,12 +44,19 @@ namespace zmq
items [1].events = ZMQ_POLLIN;
items [1].revents = 0;
- m_next_request_method = &queue_device::get_request;
- m_next_response_method = &queue_device::get_response;
+ m_next_request_method = &queue_device_t::get_request;
+ m_next_response_method = &queue_device_t::get_response;
}
- inline void run()
+ virtual void run()
{
+ int rc = zmq_msg_init(&request_msg);
+ if (!rc)
+ return;
+ rc = zmq_msg_init(&response_msg);
+ if (!rc)
+ return;
+
while (true) {
int rc = zmq::poll (&items [0], 2, -1);
if (rc < 0)
@@ -39,6 +64,9 @@ namespace zmq
next_request();
next_response();
}
+ zmq_msg_close(&request_msg);
+ zmq_msg_close(&response_msg);
+
}
private:
@@ -61,7 +89,7 @@ namespace zmq
return;
items [0].events &= ~ZMQ_POLLIN;
items [1].events |= ZMQ_POLLOUT;
- m_next_request_method = &queue_device::send_request;
+ m_next_request_method = &queue_device_t::send_request;
}
}
@@ -72,7 +100,7 @@ namespace zmq
if (!rc) return;
items [1].events &= ~ZMQ_POLLOUT;
items [0].events |= ZMQ_POLLIN;
- m_next_request_method = &queue_device::get_request;
+ m_next_request_method = &queue_device_t::get_request;
}
}
@@ -84,7 +112,7 @@ namespace zmq
return;
items [1].events &= ~ZMQ_POLLIN;
items [0].events |= ZMQ_POLLOUT;
- m_next_response_method = &queue_device::send_response;
+ m_next_response_method = &queue_device_t::send_response;
}
}
@@ -96,7 +124,7 @@ namespace zmq
return;
items [0].events &= ~ZMQ_POLLOUT;
items [1].events |= ZMQ_POLLIN;
- m_next_response_method = &queue_device::get_response;
+ m_next_response_method = &queue_device_t::get_response;
}
}
@@ -104,13 +132,13 @@ namespace zmq
zmq_msg_t request_msg;
zmq_msg_t response_msg;
- typedef void (queue_device::*next_method) ();
+ typedef void (queue_device_t::*next_method) ();
next_method m_next_request_method;
next_method m_next_response_method;
- queue_device (queue_device const &);
- void operator = (queue_device const &);
+ queue_device_t (queue_device_t const &);
+ void operator = (queue_device_t const &);
};
}
diff --git a/src/streamer_device.hpp b/src/streamer_device.hpp
index 44d6415..c5c1633 100644
--- a/src/streamer_device.hpp
+++ b/src/streamer_device.hpp
@@ -1,3 +1,25 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_STREAMER_DEVICE_HPP_INCLUDED__
+#define __ZMQ_STREAMER_DEVICE_HPP_INCLUDED__
+
#include "../include/zmq.hpp"
#include "device_base.hpp"
#include "socket_base.hpp"
@@ -5,24 +27,28 @@
namespace zmq
{
- class streamer_device : public device_base_t
+ class streamer_device_t : public device_base_t
{
public:
- streamer_device(socket_base_t * in_, socket_base_t * out_)
+ streamer_device_t(socket_base_t * in_, socket_base_t * out_)
: device_base_t(in_,out_)
{}
- void run()
+ virtual void run()
{
zmq_msg_t msg;
+ int rc = zmq_msg_init(&msg);
+ if (!rc)
+ return;
while (true) {
insocket->recv (&msg,0);
outsocket->send (&msg,0);
}
+ zmq_msg_close(&msg);
}
private:
- streamer_device(streamer_device const &);
- void operator = (streamer_device const &);
+ streamer_device_t(streamer_device_t const &);
+ void operator = (streamer_device_t const &);
};
-
-
}
+
+#endif
--
1.6.3.3
>From b9f3cb9fe4d9424fcf8430fd37db8df581c3621c Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:01:11 +0100
Subject: [PATCH 4/7] alphabetical order in automake file
---
src/Makefile.am | 4 ++--
src/forwarder_device.cpp | 2 --
src/queue_device.cpp | 21 ---------------------
src/streamer_device.cpp | 21 ---------------------
4 files changed, 2 insertions(+), 46 deletions(-)
delete mode 100644 src/forwarder_device.cpp
delete mode 100644 src/queue_device.cpp
delete mode 100644 src/streamer_device.cpp
diff --git a/src/Makefile.am b/src/Makefile.am
index 0002cd6..dfc5185 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -66,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \
err.hpp \
fd.hpp \
fd_signaler.hpp \
+ forwarder_device.hpp \
fq.hpp \
i_inout.hpp \
io_object.hpp \
@@ -94,8 +95,6 @@ libzmq_la_SOURCES = app_thread.hpp \
prefix_tree.hpp \
pub.hpp \
queue_device.hpp \
- forwarder_device.hpp \
- streamer_device.hpp \
rep.hpp \
req.hpp \
select.hpp \
@@ -103,6 +102,7 @@ libzmq_la_SOURCES = app_thread.hpp \
simple_semaphore.hpp \
socket_base.hpp \
stdint.hpp \
+ streamer_device.hpp \
sub.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
diff --git a/src/forwarder_device.cpp b/src/forwarder_device.cpp
deleted file mode 100644
index 8aac9ea..0000000
--- a/src/forwarder_device.cpp
+++ /dev/null
@@ -1,2 +0,0 @@
-#include "forwarder_device.hpp"
-
diff --git a/src/queue_device.cpp b/src/queue_device.cpp
deleted file mode 100644
index 5b77aa4..0000000
--- a/src/queue_device.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "queue_device.hpp"
-
diff --git a/src/streamer_device.cpp b/src/streamer_device.cpp
deleted file mode 100644
index 1a375fb..0000000
--- a/src/streamer_device.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "streamer_device.hpp"
-
--
1.6.3.3
>From c92a40bbf34f12ae576b40d975469c02d391c968 Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:10:49 +0100
Subject: [PATCH 5/7] make prebuilt devices use the new zmq::device_t
---
devices/zmq_forwarder/zmq_forwarder.cpp | 9 +--
devices/zmq_queue/zmq_queue.cpp | 112 +------------------------------
devices/zmq_streamer/zmq_streamer.cpp | 7 +--
3 files changed, 9 insertions(+), 119 deletions(-)
diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp
index 092bc47..310a1d3 100644
--- a/devices/zmq_forwarder/zmq_forwarder.cpp
+++ b/devices/zmq_forwarder/zmq_forwarder.cpp
@@ -113,11 +113,10 @@ int main (int argc, char *argv [])
n++;
}
- zmq::message_t msg;
- while (true) {
- in_socket.recv (&msg);
- out_socket.send (msg);
- }
+
+ zmq::device_t dev (in_socket, out_socket, ZMQ_FORWARDER);
+
+ dev.run();
return 0;
}
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
index 5eae750..deaa073 100644
--- a/devices/zmq_queue/zmq_queue.cpp
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -20,113 +20,6 @@
#include "../../include/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
-class queue
-{
-public:
-
- queue (zmq::socket_t& reply, zmq::socket_t& request) :
- xrep (reply),
- xreq (request)
- {
- items [0].socket = reply;
- items [0].fd = 0;
- items [0].events = ZMQ_POLLIN;
- items [0].revents = 0;
-
- items [1].socket = request;
- items [1].fd = 0;
- items [1].events = ZMQ_POLLIN;
- items [1].revents = 0;
-
- m_next_request_method = &queue::get_request;
- m_next_response_method = &queue::get_response;
- }
-
- void run()
- {
- while (true) {
- int rc = zmq::poll (&items [0], 2, -1);
- if (rc < 0)
- break;
- next_request();
- next_response();
- }
- }
-
-private:
-
- void next_request()
- {
- (this->*m_next_request_method) ();
- }
-
- void next_response()
- {
- (this->*m_next_response_method) ();
- }
-
- void get_request()
- {
- if (items [0].revents & ZMQ_POLLIN ) {
- int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK);
- if (!rc)
- return;
- items [0].events &= ~ZMQ_POLLIN;
- items [1].events |= ZMQ_POLLOUT;
- m_next_request_method = &queue::send_request;
- }
- }
-
- void send_request()
- {
- if (items [1].revents & ZMQ_POLLOUT) {
- int rc = xreq.send (request_msg, ZMQ_NOBLOCK);
- if (!rc) return;
- items [1].events &= ~ZMQ_POLLOUT;
- items [0].events |= ZMQ_POLLIN;
- m_next_request_method = &queue::get_request;
- }
- }
-
- void get_response()
- {
- if ( items [1].revents & ZMQ_POLLIN ) {
- int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK);
- if (!rc)
- return;
- items [1].events &= ~ZMQ_POLLIN;
- items [0].events |= ZMQ_POLLOUT;
- m_next_response_method = &queue::send_response;
- }
- }
-
- void send_response()
- {
- if (items [0].revents & ZMQ_POLLOUT) {
- int rc = xrep.send (response_msg, ZMQ_NOBLOCK);
- if (!rc)
- return;
- items [0].events &= ~ZMQ_POLLOUT;
- items [1].events |= ZMQ_POLLIN;
- m_next_response_method = &queue::get_response;
- }
- }
-
- zmq::socket_t & xrep;
- zmq::socket_t & xreq;
- zmq_pollitem_t items [2];
- zmq::message_t request_msg;
- zmq::message_t response_msg;
-
- typedef void (queue::*next_method) ();
-
- next_method m_next_request_method;
- next_method m_next_response_method;
-
- queue (queue const &);
- void operator = (queue const &);
-};
-
int main (int argc, char *argv [])
{
if (argc != 2) {
@@ -219,8 +112,9 @@ int main (int argc, char *argv [])
n++;
}
- queue q(in_socket, out_socket);
- q.run();
+ zmq::device_t dev (in_socket, out_socket, ZMQ_QUEUE);
+
+ dev.run();
return 0;
}
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
index 6eccedf..84431b0 100644
--- a/devices/zmq_streamer/zmq_streamer.cpp
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -112,11 +112,8 @@ int main (int argc, char *argv [])
n++;
}
- zmq::message_t msg;
- while (true) {
- in_socket.recv (&msg);
- out_socket.send (msg);
- }
+ zmq::device_t dev (in_socket, out_socket, ZMQ_FORWARDER);
+ dev.run();
return 0;
}
--
1.6.3.3
>From 667275e36c4314d0b7308d65bc9ac92df143f4fc Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:13:06 +0100
Subject: [PATCH 6/7] fix ignore file
---
.gitignore | 1 -
1 files changed, 0 insertions(+), 1 deletions(-)
diff --git a/.gitignore b/.gitignore
index 80a7a1c..77384d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,4 +43,3 @@ builds/msvc/*/Release
foreign/openpgm/*
!foreign/openpgm/*.tar.bz2
!foreign/openpgm/*.tar.gz
-#*#
--
1.6.3.3
>From 9429146a833253afabad51f93263f2cd07bfe34e Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:16:34 +0100
Subject: [PATCH 7/7] fixed typo
---
devices/zmq_streamer/zmq_streamer.cpp | 2 +-
1 files changed, 1 insertions(+), 1 deletions(-)
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
index 84431b0..bca6a05 100644
--- a/devices/zmq_streamer/zmq_streamer.cpp
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -112,7 +112,7 @@ int main (int argc, char *argv [])
n++;
}
- zmq::device_t dev (in_socket, out_socket, ZMQ_FORWARDER);
+ zmq::device_t dev (in_socket, out_socket, ZMQ_STREAMER);
dev.run();
return 0;
--
1.6.3.3
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev