Hi
So I grabbed some time to tidy up this patch.
I discovered that git could collapse patches all into one using 'git rebase -i'
which is certainly easier than applying the 9 I've sent previously.
(confusingly git kept the name the same, which must be fixable)

As I said previously, please review and test.

I'm submitting this under the MIT license.

Thanks,

Jon

>From 9e7758f29f0b98480bd29c2fd9805cabe5f69ea2 Mon Sep 17 00:00:00 2001
From: Jon Dyte <[email protected]>
Date: Sat, 3 Apr 2010 16:20:53 +0100
Subject: [PATCH] initial commit of devices

removed use of zmq::message_t in devices, replaced with zmq_msg_t

added copyright and ifdef guards to device header file

alphabetical order in automake file

make prebuilt devices use the new zmq::device_t

fix ignore file

fixed typo

invert test fix

Added barebones doc, an errno for unknowndevice and reformatted to zmq style

spacing arg correction
---
 devices/zmq_forwarder/zmq_forwarder.cpp |    9 +-
 devices/zmq_queue/zmq_queue.cpp         |  112 +-----------------------
 devices/zmq_streamer/zmq_streamer.cpp   |    7 +-
 doc/Makefile.am                         |    2 +-
 doc/zmq_device.txt                      |   38 ++++++++
 include/zmq.h                           |   10 ++
 include/zmq.hpp                         |   20 ++++-
 src/Makefile.am                         |    5 +
 src/device_base.cpp                     |   47 ++++++++++
 src/device_base.hpp                     |   48 ++++++++++
 src/forwarder_device.hpp                |   58 ++++++++++++
 src/queue_device.hpp                    |  146 +++++++++++++++++++++++++++++++
 src/streamer_device.hpp                 |   54 +++++++++++
 src/zmq.cpp                             |   14 +++-
 14 files changed, 448 insertions(+), 122 deletions(-)
 create mode 100644 doc/zmq_device.txt
 create mode 100644 src/device_base.cpp
 create mode 100644 src/device_base.hpp
 create mode 100644 src/forwarder_device.hpp
 create mode 100644 src/queue_device.hpp
 create mode 100644 src/streamer_device.hpp

diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp
index 092bc47..310a1d3 100644
--- a/devices/zmq_forwarder/zmq_forwarder.cpp
+++ b/devices/zmq_forwarder/zmq_forwarder.cpp
@@ -113,11 +113,10 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    zmq::message_t msg;
-    while (true) {
-        in_socket.recv (&msg);
-        out_socket.send (msg);
-    }
+
+    zmq::device_t dev (in_socket, out_socket, ZMQ_FORWARDER);
+
+    dev.run();
 
     return 0;
 }
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
index 5eae750..deaa073 100644
--- a/devices/zmq_queue/zmq_queue.cpp
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -20,113 +20,6 @@
 #include "../../include/zmq.hpp"
 #include "../../foreign/xmlParser/xmlParser.cpp"
 
-class queue
-{
-public:
-
-    queue (zmq::socket_t& reply, zmq::socket_t& request) :
-        xrep (reply),
-        xreq (request)
-    {
-        items [0].socket = reply;
-        items [0].fd = 0;
-        items [0].events = ZMQ_POLLIN;
-        items [0].revents = 0;
-
-        items [1].socket = request;
-        items [1].fd = 0;
-        items [1].events = ZMQ_POLLIN;
-        items [1].revents = 0;
-
-        m_next_request_method = &queue::get_request;
-        m_next_response_method = &queue::get_response;
-    }
-
-    void run()
-    {
-        while (true) {
-            int rc = zmq::poll (&items [0], 2, -1);
-            if (rc < 0)
-                break;
-            next_request();
-            next_response();
-        }
-    }
-
-private:
-
-    void next_request()
-    {
-        (this->*m_next_request_method) ();
-    }
-
-    void next_response()
-    {
-        (this->*m_next_response_method) ();
-    }
-
-    void get_request()
-    {
-        if (items [0].revents & ZMQ_POLLIN ) {
-            int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK);
-            if (!rc)
-                return;
-            items [0].events &= ~ZMQ_POLLIN;
-            items [1].events |= ZMQ_POLLOUT;
-            m_next_request_method = &queue::send_request;
-        }
-    }
-
-    void send_request()
-    {
-        if (items [1].revents & ZMQ_POLLOUT) {
-        int rc = xreq.send (request_msg, ZMQ_NOBLOCK);
-        if (!rc) return;
-        items [1].events &= ~ZMQ_POLLOUT;
-        items [0].events |= ZMQ_POLLIN;
-        m_next_request_method = &queue::get_request;
-        }
-    }
-
-    void get_response()
-    {
-        if ( items [1].revents & ZMQ_POLLIN ) {
-            int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK);
-            if (!rc)
-                return;
-            items [1].events &= ~ZMQ_POLLIN;
-            items [0].events |= ZMQ_POLLOUT;
-            m_next_response_method = &queue::send_response;
-        }
-    }
-
-    void send_response()
-    {
-        if (items [0].revents & ZMQ_POLLOUT) {
-            int rc = xrep.send (response_msg, ZMQ_NOBLOCK);
-            if (!rc)
-                return;
-            items [0].events &= ~ZMQ_POLLOUT;
-            items [1].events |= ZMQ_POLLIN;
-            m_next_response_method = &queue::get_response;
-        }
-    }
-
-    zmq::socket_t & xrep;
-    zmq::socket_t & xreq;
-    zmq_pollitem_t items [2];
-    zmq::message_t request_msg;
-    zmq::message_t response_msg;
-
-    typedef void (queue::*next_method) ();
-
-    next_method m_next_request_method;
-    next_method m_next_response_method;
-
-    queue (queue const &);
-    void operator = (queue const &);
-};
-
 int main (int argc, char *argv [])
 {
     if (argc != 2) {
@@ -219,8 +112,9 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    queue q(in_socket, out_socket);
-    q.run();
+    zmq::device_t dev (in_socket, out_socket, ZMQ_QUEUE);
+
+    dev.run();
 
     return 0;
 }
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
index 6eccedf..bca6a05 100644
--- a/devices/zmq_streamer/zmq_streamer.cpp
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -112,11 +112,8 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    zmq::message_t msg;
-    while (true) {
-        in_socket.recv (&msg);
-        out_socket.send (msg);
-    }
+    zmq::device_t dev (in_socket, out_socket, ZMQ_STREAMER);
+    dev.run();
 
     return 0;
 }
diff --git a/doc/Makefile.am b/doc/Makefile.am
index fce124d..444942c 100644
--- a/doc/Makefile.am
+++ b/doc/Makefile.am
@@ -3,7 +3,7 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \
     zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
     zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
     zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
-    zmq_strerror.3 zmq_term.3 zmq_version.3
+    zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_device.3
 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 \
     zmq_cpp.7
 MAN_DOC = $(MAN1) $(MAN3) $(MAN7)
diff --git a/doc/zmq_device.txt b/doc/zmq_device.txt
new file mode 100644
index 0000000..876ae39
--- /dev/null
+++ b/doc/zmq_device.txt
@@ -0,0 +1,38 @@
+zmq_device(3)
+=============
+
+
+NAME
+----
+zmq_device - run 0MQ device
+
+
+SYNOPSIS
+--------
+*void zmq_device (void '*insocket', void '*outsocket', int 'type');*
+
+
+DESCRIPTION
+-----------
+Runs device specified in the type parameter, on the supplied in and
+out sockets. The type can be ZMQ_STREAMER, ZMQ_FORWARDER, ZMQ_QUEUE.
+
+RETURN VALUE
+------------
+NONE
+
+
+SEE ALSO
+--------
+linkzmq:zmq_init[3]
+linkzmq:zmq_setsockopt[3]
+linkzmq:zmq_bind[3]
+linkzmq:zmq_connect[3]
+linkzmq:zmq_send[3]
+linkzmq:zmq_recv[3]
+
+
+AUTHORS
+-------
+The 0MQ documentation was written by Martin Sustrik <[email protected]> and
+Martin Lucina <[email protected]>.
diff --git a/include/zmq.h b/include/zmq.h
index a1fcf31..7b97d6b 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -83,6 +83,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
 #define EMTHREAD (ZMQ_HAUSNUMERO + 50)
 #define EFSM (ZMQ_HAUSNUMERO + 51)
 #define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
+#define EUNKNOWNDEVICE (ZMQ_HAUSNUMERO + 53) 
 
 //  Resolves system errors and 0MQ errors to human-readable string.
 ZMQ_EXPORT const char *zmq_strerror (int errnum);
@@ -225,6 +226,15 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
 ZMQ_EXPORT int zmq_errno ();
 
 ////////////////////////////////////////////////////////////////////////////////
+//  Devices - Experimental
+////////////////////////////////////////////////////////////////////////////////
+
+#define ZMQ_STREAMER 0
+#define ZMQ_FORWARDER 1
+#define ZMQ_QUEUE 2
+ZMQ_EXPORT void zmq_device(int device, void * insocket, void* outsocket);
+
+////////////////////////////////////////////////////////////////////////////////
 //  Helper functions.
 ////////////////////////////////////////////////////////////////////////////////
 
diff --git a/include/zmq.hpp b/include/zmq.hpp
index 6228133..53e37cf 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -59,7 +59,6 @@ namespace zmq
     class message_t : private zmq_msg_t
     {
         friend class socket_t;
-
     public:
 
         inline message_t ()
@@ -254,6 +253,25 @@ namespace zmq
         void operator = (const socket_t&);
     };
 
+
+  class device_t
+  {
+  public:
+    inline device_t(socket_t & in_, socket_t & out_, int _type)
+      : type(_type), in(in_), out(out_)
+    {}
+    inline void run()
+    {
+      zmq_device(type, in, out);
+    }
+  private:
+    int type;
+    socket_t & in;
+    socket_t & out;
+    device_t (const device_t&);
+    void operator = (const device_t&);
+  };
+
 }
 
 #endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 26106d8..dfc5185 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -57,6 +57,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     command.hpp \
     config.hpp \
     decoder.hpp \
+    device_base.hpp \
     devpoll.hpp \
     dispatcher.hpp \
     downstream.hpp \
@@ -65,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     err.hpp \
     fd.hpp \
     fd_signaler.hpp \
+    forwarder_device.hpp \
     fq.hpp \
     i_inout.hpp \
     io_object.hpp \
@@ -92,6 +94,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     p2p.hpp \
     prefix_tree.hpp \
     pub.hpp \
+    queue_device.hpp \
     rep.hpp \
     req.hpp \
     select.hpp \
@@ -99,6 +102,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     simple_semaphore.hpp \
     socket_base.hpp \
     stdint.hpp \
+    streamer_device.hpp \
     sub.hpp \
     tcp_connecter.hpp \
     tcp_listener.hpp \
@@ -123,6 +127,7 @@ libzmq_la_SOURCES = app_thread.hpp \
     zmq_listener.hpp \
     app_thread.cpp \
     command.cpp \
+    device_base.cpp \
     devpoll.cpp \
     dispatcher.cpp \
     downstream.cpp \
diff --git a/src/device_base.cpp b/src/device_base.cpp
new file mode 100644
index 0000000..300e507
--- /dev/null
+++ b/src/device_base.cpp
@@ -0,0 +1,47 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <new>
+
+#include "../include/zmq.h"
+#include "device_base.hpp"
+#include "queue_device.hpp"
+#include "forwarder_device.hpp"
+#include "streamer_device.hpp"
+
+namespace zmq
+{
+  
+    device_base_t * create_device(int dtype_, socket_base_t* in_,
+                                  socket_base_t* out_)
+    {
+        if ( dtype_ == ZMQ_FORWARDER) {
+            return new (std::nothrow) forwarder_device_t(in_, out_);
+        }
+        else if ( dtype_ == ZMQ_STREAMER ) {
+            return new (std::nothrow) streamer_device_t(in_, out_);
+        }
+        else if ( dtype_ == ZMQ_QUEUE ) {
+            return new (std::nothrow) queue_device_t(in_, out_);
+        }
+        errno = EUNKNOWNDEVICE;
+        return NULL;
+    }
+
+}
diff --git a/src/device_base.hpp b/src/device_base.hpp
new file mode 100644
index 0000000..061fb95
--- /dev/null
+++ b/src/device_base.hpp
@@ -0,0 +1,48 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DEVICE_BASE_HPP_INCLUDED__
+#define __ZMQ_DEVICE_BASE_HPP_INCLUDED__
+
+namespace zmq
+{
+
+    class device_base_t
+    {
+    public:
+        inline device_base_t( class socket_base_t * insocket_, 
+                              class socket_base_t * outsocket_)
+                : insocket(insocket_)
+                , outsocket(outsocket_)
+        {}
+
+        virtual void run() = 0;
+    protected:
+        class socket_base_t * insocket;
+        class socket_base_t * outsocket;
+    private:
+        device_base_t (const device_base_t &);
+        void operator = (const device_base_t &);
+    };
+  
+    device_base_t* create_device(int type_, socket_base_t* in_, socket_base_t* out_);
+  
+}
+
+#endif
diff --git a/src/forwarder_device.hpp b/src/forwarder_device.hpp
new file mode 100644
index 0000000..8482356
--- /dev/null
+++ b/src/forwarder_device.hpp
@@ -0,0 +1,58 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_FORWARDER_DEVICE_HPP_INCLUDED__
+#define __ZMQ_FORWARDER_DEVICE_HPP_INCLUDED__
+
+#include "../include/zmq.hpp"
+#include "device_base.hpp"
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+    class forwarder_device_t : public device_base_t
+    {
+    public:
+        forwarder_device_t(socket_base_t * in_, socket_base_t * out_)
+                : device_base_t(in_, out_)
+        {}
+
+        virtual void run() 
+        {
+            zmq_msg_t msg;
+            int rc = zmq_msg_init(&msg);
+            if (rc)
+                return;
+            while (true) {
+                insocket->recv (&msg,0);
+                outsocket->send (&msg,0);
+            }
+            zmq_msg_close(&msg);
+        }
+    
+    private:
+        forwarder_device_t(forwarder_device_t const &);
+        void operator = (forwarder_device_t const&);
+    };
+
+
+}
+
+#endif
diff --git a/src/queue_device.hpp b/src/queue_device.hpp
new file mode 100644
index 0000000..ec062ab
--- /dev/null
+++ b/src/queue_device.hpp
@@ -0,0 +1,146 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
+#define __ZMQ_QUEUE_DEVICE_HPP_INCLUDED__
+
+#include "../include/zmq.hpp"
+#include "device_base.hpp"
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+    class queue_device_t : public device_base_t
+    {
+    public:
+
+        queue_device_t (socket_base_t* reply, socket_base_t* request) 
+                : device_base_t(reply, request)
+        {
+            items [0].socket = reply;
+            items [0].fd = 0;
+            items [0].events = ZMQ_POLLIN;
+            items [0].revents = 0;
+
+            items [1].socket = request;
+            items [1].fd = 0;
+            items [1].events = ZMQ_POLLIN;
+            items [1].revents = 0;
+
+            m_next_request_method = &queue_device_t::get_request;
+            m_next_response_method = &queue_device_t::get_response;
+        }
+
+        virtual void run()
+        {
+            int rc = zmq_msg_init(&request_msg);
+            if (rc)
+                return;
+            rc = zmq_msg_init(&response_msg);
+            if (rc)
+                return;
+
+            while (true) {
+                int rc = zmq::poll (&items [0], 2, -1);
+                if (rc < 0)
+                    break;
+                next_request();
+                next_response();
+            }
+            zmq_msg_close(&request_msg);
+            zmq_msg_close(&response_msg);
+
+        }
+
+    private:
+
+        inline void next_request()
+        {
+            (this->*m_next_request_method) ();
+        }
+
+        inline void next_response()
+        {
+            (this->*m_next_response_method) ();
+        }
+
+        inline void get_request()
+        {
+            if (items [0].revents & ZMQ_POLLIN ) {
+                int rc = insocket->recv (&request_msg, ZMQ_NOBLOCK);
+                if (!rc)
+                    return;
+                items [0].events &= ~ZMQ_POLLIN;
+                items [1].events |= ZMQ_POLLOUT;
+                m_next_request_method = &queue_device_t::send_request;
+            }
+        }
+
+        inline void send_request()
+        {
+            if (items [1].revents & ZMQ_POLLOUT) {
+                int rc = outsocket->send (&request_msg, ZMQ_NOBLOCK);
+                if (!rc) return;
+                items [1].events &= ~ZMQ_POLLOUT;
+                items [0].events |= ZMQ_POLLIN;
+                m_next_request_method = &queue_device_t::get_request;
+            }
+        }
+
+        inline void get_response()
+        {
+            if ( items [1].revents & ZMQ_POLLIN ) {
+                int rc = outsocket->recv (&response_msg, ZMQ_NOBLOCK);
+                if (!rc)
+                    return;
+                items [1].events &= ~ZMQ_POLLIN;
+                items [0].events |= ZMQ_POLLOUT;
+                m_next_response_method = &queue_device_t::send_response;
+            }
+        }
+
+        inline void send_response()
+        {
+            if (items [0].revents & ZMQ_POLLOUT) {
+                int rc = insocket->send (&response_msg, ZMQ_NOBLOCK);
+                if (!rc)
+                    return;
+                items [0].events &= ~ZMQ_POLLOUT;
+                items [1].events |= ZMQ_POLLIN;
+                m_next_response_method = &queue_device_t::get_response;
+            }
+        }
+
+        zmq_pollitem_t items [2];
+        zmq_msg_t request_msg;
+        zmq_msg_t response_msg;
+
+        typedef void (queue_device_t::*next_method) ();
+
+        next_method m_next_request_method;
+        next_method m_next_response_method;
+
+        queue_device_t (queue_device_t const &);
+        void operator = (queue_device_t const &);
+    };
+
+}
+
+#endif
diff --git a/src/streamer_device.hpp b/src/streamer_device.hpp
new file mode 100644
index 0000000..1dca5e8
--- /dev/null
+++ b/src/streamer_device.hpp
@@ -0,0 +1,54 @@
+/*
+    Copyright (c) 2007-2010 iMatix Corporation
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_STREAMER_DEVICE_HPP_INCLUDED__
+#define __ZMQ_STREAMER_DEVICE_HPP_INCLUDED__
+
+#include "../include/zmq.hpp"
+#include "device_base.hpp"
+#include "socket_base.hpp"
+
+namespace zmq
+{
+  
+    class streamer_device_t : public device_base_t
+    {
+    public:
+        streamer_device_t(socket_base_t * in_, socket_base_t * out_)
+                : device_base_t(in_, out_)
+        {}
+        virtual void run()
+        {
+            zmq_msg_t msg;
+            int rc = zmq_msg_init(&msg);
+            if (rc)
+                return;
+            while (true) {
+                insocket->recv (&msg,0);
+                outsocket->send (&msg,0);
+            }
+            zmq_msg_close(&msg);
+        }
+    private:
+        streamer_device_t(streamer_device_t const &);
+        void operator = (streamer_device_t const &);
+    };
+}
+
+#endif
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 14898d5..b106ed3 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -23,7 +23,7 @@
 #include <errno.h>
 #include <stdlib.h>
 #include <new>
-
+#include "device_base.hpp"
 #include "socket_base.hpp"
 #include "app_thread.hpp"
 #include "dispatcher.hpp"
@@ -81,6 +81,9 @@ const char *zmq_strerror (int errnum_)
         return "Operation cannot be accomplished in current state";
     case ENOCOMPATPROTO:
         return "The protocol is not compatible with the socket type";
+    case EUNKNOWNDEVICE:
+        return "The device type is not recognised";
+
     default:
 #if defined _MSC_VER
 #pragma warning (push)
@@ -685,3 +688,12 @@ unsigned long zmq_stopwatch_stop (void *watch_)
     free (watch_);
     return (unsigned long) (end - start);
 }
+
+void zmq_device(int device_type_, void* in_, void* out_)
+{
+  
+  zmq::device_base_t * d = create_device(device_type_,
+				     (zmq::socket_base_t*)in_, 
+				     (zmq::socket_base_t*)out_);
+  d->run();
+}
-- 
1.6.3.3

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to