Hi Martin/ Gonzalo/ Brian

As part of the discussion on the list to provide a c api zmq_device so that you could do this :-

int main()
{
 void * ctx = zmq_init(1,1,0);
 void *ear = zmq_socket (ctx, ZMQ_SUB);
 zmq_bind (ear, "tcp://lo:5001");
 zmq_setsockopt (ear, ZMQ_SUBSCRIBE, "forex.", 6);
 void *mouth = zmq_socket (ctx, ZMQ_PUB);
 zmq_connect (ear, "tcp://127.0.0.1:5001");
 zmq_device (ZMQ_FORWARDER, ear, mouth);
}

Please find attached a first cut of this.

There is also a C++ class called device_t which can be used in the C++ space.

It should apply cleanly to the trunk, but needs a few formatting edits to get it into zmq style.
(I'm also considering renaming some of the classes as they are a bit long)
(Incidently does any one have a zmq-c-style for emacs?)

It needs testing, so if anyone has a chance, please do.
I am posting it up now so as to get it reviewed.

Thanks

Jon

>From ff1a0355745648a01af45721d04e96b6c0915b9b Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Wed, 31 Mar 2010 01:05:01 +0100
Subject: [PATCH 1/7] initial commit of devices

---
 include/zmq.h            |    9 ++++
 include/zmq.hpp          |   22 ++++++++-
 src/Makefile.am          |    8 +++
 src/device_base.cpp      |   29 +++++++++++
 src/device_base.hpp      |   47 ++++++++++++++++++
 src/forwarder_device.cpp |    2 +
 src/forwarder_device.hpp |   29 +++++++++++
 src/queue_device.cpp     |   21 ++++++++
 src/queue_device.hpp     |  117 ++++++++++++++++++++++++++++++++++++++++++++++
 src/streamer_device.cpp  |    1 +
 src/streamer_device.hpp  |   18 +++++++
 src/zmq.cpp              |   11 ++++-
 12 files changed, 312 insertions(+), 2 deletions(-)
 create mode 100644 src/device_base.cpp
 create mode 100644 src/device_base.hpp
 create mode 100644 src/forwarder_device.cpp
 create mode 100644 src/forwarder_device.hpp
 create mode 100644 src/queue_device.cpp
 create mode 100644 src/queue_device.hpp
 create mode 100644 src/streamer_device.cpp
 create mode 100644 src/streamer_device.hpp

diff --git a/include/zmq.h b/include/zmq.h
index a1fcf31..38560a8 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -225,6 +225,15 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
 ZMQ_EXPORT int zmq_errno ();
 
 ////////////////////////////////////////////////////////////////////////////////
+//  Devices - Experimental
+////////////////////////////////////////////////////////////////////////////////
+
+#define ZMQ_STREAMER 0
+#define ZMQ_FORWARDER 1
+#define ZMQ_QUEUE 2
+ZMQ_EXPORT void zmq_device(int device, void * insocket, void* outsocket);
+
+////////////////////////////////////////////////////////////////////////////////
 //  Helper functions.
 ////////////////////////////////////////////////////////////////////////////////
 
diff --git a/include/zmq.hpp b/include/zmq.hpp
index 6228133..f452152 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -59,7 +59,8 @@ namespace zmq
     class message_t : private zmq_msg_t
     {
         friend class socket_t;
-
+      friend class queue_device;
+      friend class forwarder_device;
     public:
 
         inline message_t ()
@@ -254,6 +255,25 @@ namespace zmq
         void operator = (const socket_t&);
     };
 
+
+  class device_t
+  {
+  public:
+    inline device_t(socket_t & in_, socket_t & out_, int _type)
+      : type(_type), in(in_), out(out_)
+    {}
+    inline void run()
+    {
+      zmq_device(type, in, out);
+    }
+  private:
+    int type;
+    socket_t & in;
+    socket_t & out;
+    device_t (const device_t&);
+    void operator = (const device_t&);
+  };
+
 }
 
 #endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 26106d8..517fabe 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -57,6 +57,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     command.hpp \
     config.hpp \
     decoder.hpp \
+    device_base.hpp \
     devpoll.hpp \
     dispatcher.hpp \
     downstream.hpp \
@@ -92,6 +93,9 @@ libzmq_la_SOURCES = app_thread.hpp \
     p2p.hpp \
     prefix_tree.hpp \
     pub.hpp \
+    queue_device.hpp \
+    forwarder_device.hpp \
+    streamer_device.hpp \
     rep.hpp \
     req.hpp \
     select.hpp \
@@ -123,6 +127,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     zmq_listener.hpp \
     app_thread.cpp \
     command.cpp \
+    device_base.cpp \
     devpoll.cpp \
     dispatcher.cpp \
     downstream.cpp \
@@ -146,6 +151,9 @@ libzmq_la_SOURCES = app_thread.hpp \
     pipe.cpp \
     poll.cpp \
     pub.cpp \
+    queue_device.cpp \
+    forwarder_device.cpp \
+    streamer_device.cpp \
     rep.cpp \
     req.cpp \
     select.cpp \
diff --git a/src/device_base.cpp b/src/device_base.cpp
new file mode 100644
index 0000000..be16e41
--- /dev/null
+++ b/src/device_base.cpp
@@ -0,0 +1,29 @@
+#include <new>
+
+#include "../include/zmq.h"
+
+
+#include "device_base.hpp"
+#include "queue_device.hpp"
+#include "forwarder_device.hpp"
+
+namespace zmq
+{
+  
+  device_base_t * create_device(int dtype_, 
+				socket_base_t* in_,
+				socket_base_t* out_)
+  {
+    if ( dtype_ == ZMQ_FORWARDER) {
+      return new (std::nothrow) forwarder_device(in_,out_);
+    }
+    else if ( dtype_ == ZMQ_STREAMER ) {
+      return NULL;
+    }
+    else if ( dtype_ == ZMQ_QUEUE ) {
+      return new (std::nothrow) queue_device(in_,out_);
+    }
+    return NULL;
+  }
+
+}
diff --git a/src/device_base.hpp b/src/device_base.hpp
new file mode 100644
index 0000000..c97149e
--- /dev/null
+++ b/src/device_base.hpp
@@ -0,0 +1,47 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DEVICE_BASE_HPP_INCLUDED__
+#define __ZMQ_DEVICE_BASE_HPP_INCLUDED__
+
+namespace zmq
+{
+  class device_base_t
+  {
+  public:
+    inline device_base_t( class socket_base_t * insocket_, 
+			  class socket_base_t * outsocket_)
+      : insocket(insocket_)
+      , outsocket(outsocket_)
+    {}
+
+    virtual void run() = 0;
+  protected:
+    class socket_base_t * insocket;
+    class socket_base_t * outsocket;
+  private:
+    device_base_t (const device_base_t &);
+    void operator = (const device_base_t &);
+  };
+  
+  device_base_t* create_device(int type_, socket_base_t* in_, socket_base_t* out_);
+  
+}
+
+#endif
diff --git a/src/forwarder_device.cpp b/src/forwarder_device.cpp
new file mode 100644
index 0000000..8aac9ea
--- /dev/null
+++ b/src/forwarder_device.cpp
@@ -0,0 +1,2 @@
+#include "forwarder_device.hpp"
+
diff --git a/src/forwarder_device.hpp b/src/forwarder_device.hpp
new file mode 100644
index 0000000..219b9d4
--- /dev/null
+++ b/src/forwarder_device.hpp
@@ -0,0 +1,29 @@
+#include "../include/zmq.hpp"
+#include "device_base.hpp"
+#include "socket_base.hpp"
+
+namespace zmq
+{
+  
+  class forwarder_device : public device_base_t
+  {
+  public:
+    forwarder_device(socket_base_t * in_, socket_base_t * out_)
+      : device_base_t(in_,out_)
+    {}
+
+    void run() 
+    {
+      zmq::message_t msg;
+      while (true) {
+        insocket->recv (&msg,0);
+        outsocket->send (&msg,0);
+      }
+    }
+    
+  private:
+
+  };
+
+
+}
diff --git a/src/queue_device.cpp b/src/queue_device.cpp
new file mode 100644
index 0000000..5b77aa4
--- /dev/null
+++ b/src/queue_device.cpp
@@ -0,0 +1,21 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "queue_device.hpp"
+
diff --git a/src/queue_device.hpp b/src/queue_device.hpp
new file mode 100644
index 0000000..b730c09
--- /dev/null
+++ b/src/queue_device.hpp
@@ -0,0 +1,117 @@
+
+#ifndef __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
+#define __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
+
+#include "../include/zmq.hpp"
+#include "device_base.hpp"
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+  class queue_device : public device_base_t
+  {
+  public:
+
+    queue_device (socket_base_t* reply, socket_base_t* request) 
+      : device_base_t(reply,request)
+    {
+      items [0].socket = reply;
+      items [0].fd = 0;
+      items [0].events = ZMQ_POLLIN;
+      items [0].revents = 0;
+
+      items [1].socket = request;
+      items [1].fd = 0;
+      items [1].events = ZMQ_POLLIN;
+      items [1].revents = 0;
+
+      m_next_request_method = &queue_device::get_request;
+      m_next_response_method = &queue_device::get_response;
+    }
+
+    void run()
+    {
+      while (true) {
+	int rc = zmq::poll (&items [0], 2, -1);
+	if (rc < 0)
+	  break;
+	next_request();
+	next_response();
+      }
+    }
+
+  private:
+
+    void next_request()
+    {
+      (this->*m_next_request_method) ();
+    }
+
+    void next_response()
+    {
+      (this->*m_next_response_method) ();
+    }
+
+    void get_request()
+    {
+      if (items [0].revents & ZMQ_POLLIN ) {
+	int rc = insocket->recv (&request_msg, ZMQ_NOBLOCK);
+	if (!rc)
+	  return;
+	items [0].events &= ~ZMQ_POLLIN;
+	items [1].events |= ZMQ_POLLOUT;
+	m_next_request_method = &queue_device::send_request;
+      }
+    }
+
+    void send_request()
+    {
+      if (items [1].revents & ZMQ_POLLOUT) {
+        int rc = outsocket->send (&request_msg, ZMQ_NOBLOCK);
+        if (!rc) return;
+        items [1].events &= ~ZMQ_POLLOUT;
+        items [0].events |= ZMQ_POLLIN;
+        m_next_request_method = &queue_device::get_request;
+      }
+    }
+
+    void get_response()
+    {
+      if ( items [1].revents & ZMQ_POLLIN ) {
+	int rc = outsocket->recv (&response_msg, ZMQ_NOBLOCK);
+	if (!rc)
+	  return;
+	items [1].events &= ~ZMQ_POLLIN;
+	items [0].events |= ZMQ_POLLOUT;
+	m_next_response_method = &queue_device::send_response;
+      }
+    }
+void send_response()
+    {
+      if (items [0].revents & ZMQ_POLLOUT) {
+	int rc = insocket->send (&response_msg, ZMQ_NOBLOCK);
+	if (!rc)
+	  return;
+	items [0].events &= ~ZMQ_POLLOUT;
+	items [1].events |= ZMQ_POLLIN;
+	m_next_response_method = &queue_device::get_response;
+      }
+    }
+
+    zmq_pollitem_t items [2];
+    zmq::message_t request_msg;
+    zmq::message_t response_msg;
+
+    typedef void (queue_device::*next_method) ();
+
+    next_method m_next_request_method;
+    next_method m_next_response_method;
+
+    queue_device (queue_device const &);
+    void operator = (queue_device const &);
+  };
+
+}
+
+#endif
diff --git a/src/streamer_device.cpp b/src/streamer_device.cpp
new file mode 100644
index 0000000..b2bc881
--- /dev/null
+++ b/src/streamer_device.cpp
@@ -0,0 +1 @@
+#include "streamer_device.hpp"
diff --git a/src/streamer_device.hpp b/src/streamer_device.hpp
new file mode 100644
index 0000000..2d1bbff
--- /dev/null
+++ b/src/streamer_device.hpp
@@ -0,0 +1,18 @@
+#include "device_base.hpp"
+
+namespace zmq
+{
+  
+  class streamer_device : public device_base_t
+  {
+  public:
+    streamer_device(socket_base_t * in_, socket_base_t * out_)
+      : device_base_t(in_,out_)
+    {}
+    void run() {}
+  private:
+
+  };
+
+
+}
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 14898d5..91a3a3a 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -23,7 +23,7 @@
 #include <errno.h>
 #include <stdlib.h>
 #include <new>
-
+#include "device_base.hpp"
 #include "socket_base.hpp"
 #include "app_thread.hpp"
 #include "dispatcher.hpp"
@@ -685,3 +685,12 @@ unsigned long zmq_stopwatch_stop (void *watch_)
     free (watch_);
     return (unsigned long) (end - start);
 }
+
+void zmq_device(int device_type_, void* in_, void* out_)
+{
+  
+  zmq::device_base_t * d = create_device(device_type_,
+				     (zmq::socket_base_t*)in_, 
+				     (zmq::socket_base_t*)out_);
+  d->run();
+}
-- 
1.6.3.3

>From 8cc9da3795f118315db4b4747bb7c556930546b1 Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 21:05:03 +0100
Subject: [PATCH 2/7] removed use of zmq::message_t in devices, replaced with zmq_msg_t

---
 include/zmq.hpp          |    2 --
 src/device_base.cpp      |   11 ++++++-----
 src/forwarder_device.hpp |    2 +-
 src/queue_device.hpp     |   19 ++++++++++---------
 src/streamer_device.cpp  |   20 ++++++++++++++++++++
 src/streamer_device.hpp  |   14 ++++++++++++--
 6 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/include/zmq.hpp b/include/zmq.hpp
index f452152..53e37cf 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -59,8 +59,6 @@ namespace zmq
     class message_t : private zmq_msg_t
     {
         friend class socket_t;
-      friend class queue_device;
-      friend class forwarder_device;
     public:
 
         inline message_t ()
diff --git a/src/device_base.cpp b/src/device_base.cpp
index be16e41..bfbe580 100644
--- a/src/device_base.cpp
+++ b/src/device_base.cpp
@@ -6,23 +6,24 @@
 #include "device_base.hpp"
 #include "queue_device.hpp"
 #include "forwarder_device.hpp"
+#include "streamer_device.hpp"
 
 namespace zmq
 {
   
-  device_base_t * create_device(int dtype_, 
-				socket_base_t* in_,
+  device_base_t * create_device(int dtype_, socket_base_t* in_,
 				socket_base_t* out_)
   {
     if ( dtype_ == ZMQ_FORWARDER) {
-      return new (std::nothrow) forwarder_device(in_,out_);
+      return new (std::nothrow) forwarder_device(in_, out_);
     }
     else if ( dtype_ == ZMQ_STREAMER ) {
-      return NULL;
+      return new (std::nothrow) streamer_device(in_, out_);
     }
     else if ( dtype_ == ZMQ_QUEUE ) {
-      return new (std::nothrow) queue_device(in_,out_);
+      return new (std::nothrow) queue_device(in_, out_);
     }
+    // FIXME errno?
     return NULL;
   }
 
diff --git a/src/forwarder_device.hpp b/src/forwarder_device.hpp
index 219b9d4..75f75e8 100644
--- a/src/forwarder_device.hpp
+++ b/src/forwarder_device.hpp
@@ -14,7 +14,7 @@ namespace zmq
 
     void run() 
     {
-      zmq::message_t msg;
+      zmq_msg_t msg;
       while (true) {
         insocket->recv (&msg,0);
         outsocket->send (&msg,0);
diff --git a/src/queue_device.hpp b/src/queue_device.hpp
index b730c09..a1fd3aa 100644
--- a/src/queue_device.hpp
+++ b/src/queue_device.hpp
@@ -30,7 +30,7 @@ namespace zmq
       m_next_response_method = &queue_device::get_response;
     }
 
-    void run()
+    inline void run()
     {
       while (true) {
 	int rc = zmq::poll (&items [0], 2, -1);
@@ -43,17 +43,17 @@ namespace zmq
 
   private:
 
-    void next_request()
+    inline void next_request()
     {
       (this->*m_next_request_method) ();
     }
 
-    void next_response()
+    inline void next_response()
     {
       (this->*m_next_response_method) ();
     }
 
-    void get_request()
+    inline void get_request()
     {
       if (items [0].revents & ZMQ_POLLIN ) {
 	int rc = insocket->recv (&request_msg, ZMQ_NOBLOCK);
@@ -65,7 +65,7 @@ namespace zmq
       }
     }
 
-    void send_request()
+    inline void send_request()
     {
       if (items [1].revents & ZMQ_POLLOUT) {
         int rc = outsocket->send (&request_msg, ZMQ_NOBLOCK);
@@ -76,7 +76,7 @@ namespace zmq
       }
     }
 
-    void get_response()
+    inline void get_response()
     {
       if ( items [1].revents & ZMQ_POLLIN ) {
 	int rc = outsocket->recv (&response_msg, ZMQ_NOBLOCK);
@@ -87,7 +87,8 @@ namespace zmq
 	m_next_response_method = &queue_device::send_response;
       }
     }
-void send_response()
+
+    inline void send_response()
     {
       if (items [0].revents & ZMQ_POLLOUT) {
 	int rc = insocket->send (&response_msg, ZMQ_NOBLOCK);
@@ -100,8 +101,8 @@ void send_response()
     }
 
     zmq_pollitem_t items [2];
-    zmq::message_t request_msg;
-    zmq::message_t response_msg;
+    zmq_msg_t request_msg;
+    zmq_msg_t response_msg;
 
     typedef void (queue_device::*next_method) ();
 
diff --git a/src/streamer_device.cpp b/src/streamer_device.cpp
index b2bc881..1a375fb 100644
--- a/src/streamer_device.cpp
+++ b/src/streamer_device.cpp
@@ -1 +1,21 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
 #include "streamer_device.hpp"
+
diff --git a/src/streamer_device.hpp b/src/streamer_device.hpp
index 2d1bbff..44d6415 100644
--- a/src/streamer_device.hpp
+++ b/src/streamer_device.hpp
@@ -1,4 +1,6 @@
+#include "../include/zmq.hpp"
 #include "device_base.hpp"
+#include "socket_base.hpp"
 
 namespace zmq
 {
@@ -9,9 +11,17 @@ namespace zmq
     streamer_device(socket_base_t * in_, socket_base_t * out_)
       : device_base_t(in_,out_)
     {}
-    void run() {}
+    void run()
+    {
+      zmq_msg_t msg;
+      while (true) {
+        insocket->recv (&msg,0);
+        outsocket->send (&msg,0);
+      }
+    }
   private:
-
+    streamer_device(streamer_device const &);
+    void operator = (streamer_device const &);
   };
 
 
-- 
1.6.3.3

>From 2459e4bea8c2c4c9f13dec651d0eec54f47d29d4 Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 21:50:34 +0100
Subject: [PATCH 3/7] added copyright and ifdef guards to device header file

---
 .gitignore               |    1 +
 src/Makefile.am          |    3 --
 src/device_base.cpp      |   27 +++++++++++++++++++----
 src/forwarder_device.hpp |   37 +++++++++++++++++++++++++++++---
 src/queue_device.hpp     |   52 +++++++++++++++++++++++++++++++++++----------
 src/streamer_device.hpp  |   40 +++++++++++++++++++++++++++++------
 6 files changed, 129 insertions(+), 31 deletions(-)

diff --git a/.gitignore b/.gitignore
index 77384d6..80a7a1c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,3 +43,4 @@ builds/msvc/*/Release
 foreign/openpgm/*
 !foreign/openpgm/*.tar.bz2
 !foreign/openpgm/*.tar.gz
+#*#
diff --git a/src/Makefile.am b/src/Makefile.am
index 517fabe..0002cd6 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -151,9 +151,6 @@ libzmq_la_SOURCES = app_thread.hpp \
     pipe.cpp \
     poll.cpp \
     pub.cpp \
-    queue_device.cpp \
-    forwarder_device.cpp \
-    streamer_device.cpp \
     rep.cpp \
     req.cpp \
     select.cpp \
diff --git a/src/device_base.cpp b/src/device_base.cpp
index bfbe580..804a519 100644
--- a/src/device_base.cpp
+++ b/src/device_base.cpp
@@ -1,8 +1,25 @@
-#include <new>
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
 
-#include "../include/zmq.h"
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
 
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
 
+#include <new>
+
+#include "../include/zmq.h"
 #include "device_base.hpp"
 #include "queue_device.hpp"
 #include "forwarder_device.hpp"
@@ -15,13 +32,13 @@ namespace zmq
 				socket_base_t* out_)
   {
     if ( dtype_ == ZMQ_FORWARDER) {
-      return new (std::nothrow) forwarder_device(in_, out_);
+      return new (std::nothrow) forwarder_device_t(in_, out_);
     }
     else if ( dtype_ == ZMQ_STREAMER ) {
-      return new (std::nothrow) streamer_device(in_, out_);
+      return new (std::nothrow) streamer_device_t(in_, out_);
     }
     else if ( dtype_ == ZMQ_QUEUE ) {
-      return new (std::nothrow) queue_device(in_, out_);
+      return new (std::nothrow) queue_device_t(in_, out_);
     }
     // FIXME errno?
     return NULL;
diff --git a/src/forwarder_device.hpp b/src/forwarder_device.hpp
index 75f75e8..63d9358 100644
--- a/src/forwarder_device.hpp
+++ b/src/forwarder_device.hpp
@@ -1,3 +1,25 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_FORWARDER_DEVICE_HPP_INCLUDED__
+#define __ZMQ_FORWARDER_DEVICE_HPP_INCLUDED__
+
 #include "../include/zmq.hpp"
 #include "device_base.hpp"
 #include "socket_base.hpp"
@@ -5,25 +27,32 @@
 namespace zmq
 {
   
-  class forwarder_device : public device_base_t
+  class forwarder_device_t : public device_base_t
   {
   public:
-    forwarder_device(socket_base_t * in_, socket_base_t * out_)
+    forwarder_device_t(socket_base_t * in_, socket_base_t * out_)
       : device_base_t(in_,out_)
     {}
 
-    void run() 
+    virtual void run() 
     {
       zmq_msg_t msg;
+      int rc = zmq_msg_init(&msg);
+      if (!rc)
+	return;
       while (true) {
         insocket->recv (&msg,0);
         outsocket->send (&msg,0);
       }
+      zmq_msg_close(&msg);
     }
     
   private:
-
+    forwarder_device_t(forwarder_device_t const &);
+    void operator = (forwarder_device_t const&);
   };
 
 
 }
+
+#endif
diff --git a/src/queue_device.hpp b/src/queue_device.hpp
index a1fd3aa..ab83b39 100644
--- a/src/queue_device.hpp
+++ b/src/queue_device.hpp
@@ -1,3 +1,21 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
 
 #ifndef __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
 #define __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
@@ -9,11 +27,11 @@
 namespace zmq
 {
 
-  class queue_device : public device_base_t
+  class queue_device_t : public device_base_t
   {
   public:
 
-    queue_device (socket_base_t* reply, socket_base_t* request) 
+    queue_device_t (socket_base_t* reply, socket_base_t* request) 
       : device_base_t(reply,request)
     {
       items [0].socket = reply;
@@ -26,12 +44,19 @@ namespace zmq
       items [1].events = ZMQ_POLLIN;
       items [1].revents = 0;
 
-      m_next_request_method = &queue_device::get_request;
-      m_next_response_method = &queue_device::get_response;
+      m_next_request_method = &queue_device_t::get_request;
+      m_next_response_method = &queue_device_t::get_response;
     }
 
-    inline void run()
+    virtual void run()
     {
+      int rc = zmq_msg_init(&request_msg);
+      if (!rc)
+	return;
+      rc = zmq_msg_init(&response_msg);
+      if (!rc)
+	return;
+
       while (true) {
 	int rc = zmq::poll (&items [0], 2, -1);
 	if (rc < 0)
@@ -39,6 +64,9 @@ namespace zmq
 	next_request();
 	next_response();
       }
+      zmq_msg_close(&request_msg);
+      zmq_msg_close(&response_msg);
+
     }
 
   private:
@@ -61,7 +89,7 @@ namespace zmq
 	  return;
 	items [0].events &= ~ZMQ_POLLIN;
 	items [1].events |= ZMQ_POLLOUT;
-	m_next_request_method = &queue_device::send_request;
+	m_next_request_method = &queue_device_t::send_request;
       }
     }
 
@@ -72,7 +100,7 @@ namespace zmq
         if (!rc) return;
         items [1].events &= ~ZMQ_POLLOUT;
         items [0].events |= ZMQ_POLLIN;
-        m_next_request_method = &queue_device::get_request;
+        m_next_request_method = &queue_device_t::get_request;
       }
     }
 
@@ -84,7 +112,7 @@ namespace zmq
 	  return;
 	items [1].events &= ~ZMQ_POLLIN;
 	items [0].events |= ZMQ_POLLOUT;
-	m_next_response_method = &queue_device::send_response;
+	m_next_response_method = &queue_device_t::send_response;
       }
     }
 
@@ -96,7 +124,7 @@ namespace zmq
 	  return;
 	items [0].events &= ~ZMQ_POLLOUT;
 	items [1].events |= ZMQ_POLLIN;
-	m_next_response_method = &queue_device::get_response;
+	m_next_response_method = &queue_device_t::get_response;
       }
     }
 
@@ -104,13 +132,13 @@ namespace zmq
     zmq_msg_t request_msg;
     zmq_msg_t response_msg;
 
-    typedef void (queue_device::*next_method) ();
+    typedef void (queue_device_t::*next_method) ();
 
     next_method m_next_request_method;
     next_method m_next_response_method;
 
-    queue_device (queue_device const &);
-    void operator = (queue_device const &);
+    queue_device_t (queue_device_t const &);
+    void operator = (queue_device_t const &);
   };
 
 }
diff --git a/src/streamer_device.hpp b/src/streamer_device.hpp
index 44d6415..c5c1633 100644
--- a/src/streamer_device.hpp
+++ b/src/streamer_device.hpp
@@ -1,3 +1,25 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_STREAMER_DEVICE_HPP_INCLUDED__
+#define __ZMQ_STREAMER_DEVICE_HPP_INCLUDED__
+
 #include "../include/zmq.hpp"
 #include "device_base.hpp"
 #include "socket_base.hpp"
@@ -5,24 +27,28 @@
 namespace zmq
 {
   
-  class streamer_device : public device_base_t
+  class streamer_device_t : public device_base_t
   {
   public:
-    streamer_device(socket_base_t * in_, socket_base_t * out_)
+    streamer_device_t(socket_base_t * in_, socket_base_t * out_)
       : device_base_t(in_,out_)
     {}
-    void run()
+    virtual void run()
     {
       zmq_msg_t msg;
+      int rc = zmq_msg_init(&msg);
+      if (!rc)
+	return;
       while (true) {
         insocket->recv (&msg,0);
         outsocket->send (&msg,0);
       }
+      zmq_msg_close(&msg);
     }
   private:
-    streamer_device(streamer_device const &);
-    void operator = (streamer_device const &);
+    streamer_device_t(streamer_device_t const &);
+    void operator = (streamer_device_t const &);
   };
-
-
 }
+
+#endif
-- 
1.6.3.3

>From b9f3cb9fe4d9424fcf8430fd37db8df581c3621c Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:01:11 +0100
Subject: [PATCH 4/7] alphabetical order in automake file

---
 src/Makefile.am          |    4 ++--
 src/forwarder_device.cpp |    2 --
 src/queue_device.cpp     |   21 ---------------------
 src/streamer_device.cpp  |   21 ---------------------
 4 files changed, 2 insertions(+), 46 deletions(-)
 delete mode 100644 src/forwarder_device.cpp
 delete mode 100644 src/queue_device.cpp
 delete mode 100644 src/streamer_device.cpp

diff --git a/src/Makefile.am b/src/Makefile.am
index 0002cd6..dfc5185 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -66,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     err.hpp \
     fd.hpp \
     fd_signaler.hpp \
+    forwarder_device.hpp \
     fq.hpp \
     i_inout.hpp \
     io_object.hpp \
@@ -94,8 +95,6 @@ libzmq_la_SOURCES = app_thread.hpp \
     prefix_tree.hpp \
     pub.hpp \
     queue_device.hpp \
-    forwarder_device.hpp \
-    streamer_device.hpp \
     rep.hpp \
     req.hpp \
     select.hpp \
@@ -103,6 +102,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     simple_semaphore.hpp \
     socket_base.hpp \
     stdint.hpp \
+    streamer_device.hpp \
     sub.hpp \
     tcp_connecter.hpp \
     tcp_listener.hpp \
diff --git a/src/forwarder_device.cpp b/src/forwarder_device.cpp
deleted file mode 100644
index 8aac9ea..0000000
--- a/src/forwarder_device.cpp
+++ /dev/null
@@ -1,2 +0,0 @@
-#include "forwarder_device.hpp"
-
diff --git a/src/queue_device.cpp b/src/queue_device.cpp
deleted file mode 100644
index 5b77aa4..0000000
--- a/src/queue_device.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
-    Copyright (c) 2007-2010 iMatix Corporation
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "queue_device.hpp"
-
diff --git a/src/streamer_device.cpp b/src/streamer_device.cpp
deleted file mode 100644
index 1a375fb..0000000
--- a/src/streamer_device.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
-    Copyright (c) 2007-2010 iMatix Corporation
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "streamer_device.hpp"
-
-- 
1.6.3.3

>From c92a40bbf34f12ae576b40d975469c02d391c968 Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:10:49 +0100
Subject: [PATCH 5/7] make prebuilt devices use the new zmq::device_t

---
 devices/zmq_forwarder/zmq_forwarder.cpp |    9 +--
 devices/zmq_queue/zmq_queue.cpp         |  112 +------------------------------
 devices/zmq_streamer/zmq_streamer.cpp   |    7 +--
 3 files changed, 9 insertions(+), 119 deletions(-)

diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp
index 092bc47..310a1d3 100644
--- a/devices/zmq_forwarder/zmq_forwarder.cpp
+++ b/devices/zmq_forwarder/zmq_forwarder.cpp
@@ -113,11 +113,10 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    zmq::message_t msg;
-    while (true) {
-        in_socket.recv (&msg);
-        out_socket.send (msg);
-    }
+
+    zmq::device_t dev (in_socket, out_socket, ZMQ_FORWARDER);
+
+    dev.run();
 
     return 0;
 }
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
index 5eae750..deaa073 100644
--- a/devices/zmq_queue/zmq_queue.cpp
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -20,113 +20,6 @@
 #include "../../include/zmq.hpp"
 #include "../../foreign/xmlParser/xmlParser.cpp"
 
-class queue
-{
-public:
-
-    queue (zmq::socket_t& reply, zmq::socket_t& request) :
-        xrep (reply),
-        xreq (request)
-    {
-        items [0].socket = reply;
-        items [0].fd = 0;
-        items [0].events = ZMQ_POLLIN;
-        items [0].revents = 0;
-
-        items [1].socket = request;
-        items [1].fd = 0;
-        items [1].events = ZMQ_POLLIN;
-        items [1].revents = 0;
-
-        m_next_request_method = &queue::get_request;
-        m_next_response_method = &queue::get_response;
-    }
-
-    void run()
-    {
-        while (true) {
-            int rc = zmq::poll (&items [0], 2, -1);
-            if (rc < 0)
-                break;
-            next_request();
-            next_response();
-        }
-    }
-
-private:
-
-    void next_request()
-    {
-        (this->*m_next_request_method) ();
-    }
-
-    void next_response()
-    {
-        (this->*m_next_response_method) ();
-    }
-
-    void get_request()
-    {
-        if (items [0].revents & ZMQ_POLLIN ) {
-            int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK);
-            if (!rc)
-                return;
-            items [0].events &= ~ZMQ_POLLIN;
-            items [1].events |= ZMQ_POLLOUT;
-            m_next_request_method = &queue::send_request;
-        }
-    }
-
-    void send_request()
-    {
-        if (items [1].revents & ZMQ_POLLOUT) {
-        int rc = xreq.send (request_msg, ZMQ_NOBLOCK);
-        if (!rc) return;
-        items [1].events &= ~ZMQ_POLLOUT;
-        items [0].events |= ZMQ_POLLIN;
-        m_next_request_method = &queue::get_request;
-        }
-    }
-
-    void get_response()
-    {
-        if ( items [1].revents & ZMQ_POLLIN ) {
-            int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK);
-            if (!rc)
-                return;
-            items [1].events &= ~ZMQ_POLLIN;
-            items [0].events |= ZMQ_POLLOUT;
-            m_next_response_method = &queue::send_response;
-        }
-    }
-
-    void send_response()
-    {
-        if (items [0].revents & ZMQ_POLLOUT) {
-            int rc = xrep.send (response_msg, ZMQ_NOBLOCK);
-            if (!rc)
-                return;
-            items [0].events &= ~ZMQ_POLLOUT;
-            items [1].events |= ZMQ_POLLIN;
-            m_next_response_method = &queue::get_response;
-        }
-    }
-
-    zmq::socket_t & xrep;
-    zmq::socket_t & xreq;
-    zmq_pollitem_t items [2];
-    zmq::message_t request_msg;
-    zmq::message_t response_msg;
-
-    typedef void (queue::*next_method) ();
-
-    next_method m_next_request_method;
-    next_method m_next_response_method;
-
-    queue (queue const &);
-    void operator = (queue const &);
-};
-
 int main (int argc, char *argv [])
 {
     if (argc != 2) {
@@ -219,8 +112,9 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    queue q(in_socket, out_socket);
-    q.run();
+    zmq::device_t dev (in_socket, out_socket, ZMQ_QUEUE);
+
+    dev.run();
 
     return 0;
 }
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
index 6eccedf..84431b0 100644
--- a/devices/zmq_streamer/zmq_streamer.cpp
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -112,11 +112,8 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    zmq::message_t msg;
-    while (true) {
-        in_socket.recv (&msg);
-        out_socket.send (msg);
-    }
+    zmq::device_t dev (in_socket, out_socket, ZMQ_FORWARDER);
+    dev.run();
 
     return 0;
 }
-- 
1.6.3.3

>From 667275e36c4314d0b7308d65bc9ac92df143f4fc Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:13:06 +0100
Subject: [PATCH 6/7] fix ignore file

---
 .gitignore |    1 -
 1 files changed, 0 insertions(+), 1 deletions(-)

diff --git a/.gitignore b/.gitignore
index 80a7a1c..77384d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,4 +43,3 @@ builds/msvc/*/Release
 foreign/openpgm/*
 !foreign/openpgm/*.tar.bz2
 !foreign/openpgm/*.tar.gz
-#*#
-- 
1.6.3.3

>From 9429146a833253afabad51f93263f2cd07bfe34e Mon Sep 17 00:00:00 2001
From: jon <j...@ubik.(none)>
Date: Thu, 1 Apr 2010 22:16:34 +0100
Subject: [PATCH 7/7] fixed typo

---
 devices/zmq_streamer/zmq_streamer.cpp |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
index 84431b0..bca6a05 100644
--- a/devices/zmq_streamer/zmq_streamer.cpp
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -112,7 +112,7 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    zmq::device_t dev (in_socket, out_socket, ZMQ_FORWARDER);
+    zmq::device_t dev (in_socket, out_socket, ZMQ_STREAMER);
     dev.run();
 
     return 0;
-- 
1.6.3.3

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to