Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp?rev=746061&r1=746060&r2=746061&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp Fri Feb 20 
00:04:37 2009
@@ -22,17 +22,46 @@
  *
  */
 
-#include "PollableCondition.h"
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/IOHandle.h"
 #include "qpid/sys/posix/PrivatePosix.h"
 #include "qpid/Exception.h"
 
+#include <boost/bind.hpp>
+
 #include <unistd.h>
 #include <fcntl.h>
 
 namespace qpid {
 namespace sys {
 
-PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
+class PollableConditionPrivate : public sys::IOHandle {
+    friend class PollableCondition;
+
+private:
+    PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
+                             sys::PollableCondition& parent,
+                             const boost::shared_ptr<sys::Poller>& poller);
+    ~PollableConditionPrivate();
+
+    void dispatch(sys::DispatchHandle& h);
+    void rewatch();
+    void unwatch();
+
+private:
+    PollableCondition::Callback cb;
+    PollableCondition& parent;
+    boost::shared_ptr<sys::Poller> poller;
+    int writeFd;
+    std::auto_ptr<DispatchHandleRef> handle;
+};
+
+PollableConditionPrivate::PollableConditionPrivate(const 
sys::PollableCondition::Callback& cb,
+                                                   sys::PollableCondition& 
parent,
+                                                   const 
boost::shared_ptr<sys::Poller>& poller)
+  : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent)
+{
     int fds[2];
     if (::pipe(fds) == -1)
         throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
@@ -42,22 +71,71 @@
         throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
     if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1)
         throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
+    handle.reset (new DispatchHandleRef(*this,
+                                        
boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1),
+                                        0, 0));
+    handle->startWatch(poller);
+    handle->unwatch();
+}
+
+PollableConditionPrivate::~PollableConditionPrivate()
+{
+    handle->stopWatch();
+    close(writeFd);
+}
+
+void PollableConditionPrivate::dispatch(sys::DispatchHandle& /*h*/)
+{
+    cb(parent);
+}
+
+void PollableConditionPrivate::rewatch()
+{
+    handle->rewatch();
+}
+
+void PollableConditionPrivate::unwatch()
+{
+    handle->unwatch();
+}
+
+  /* PollableCondition */
+
+PollableCondition::PollableCondition(const Callback& cb,
+                                     const boost::shared_ptr<sys::Poller>& 
poller)
+  : impl(new PollableConditionPrivate(cb, *this, poller))
+{
+}
+
+PollableCondition::~PollableCondition()
+{
+    delete impl;
+}
+
+void PollableCondition::set() {
+    static const char dummy=0;
+    ssize_t n = ::write(impl->writeFd, &dummy, 1);
+    if (n == -1 && errno != EAGAIN)
+        throw ErrnoException("Error setting PollableCondition");
 }
 
 bool PollableCondition::clear() {
     char buf[256];
     ssize_t n;
     bool wasSet = false;
-    while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0) 
+    while ((n = ::read(impl->impl->fd, buf, sizeof(buf))) > 0) 
         wasSet = true;
-    if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error 
clearing PollableCondition"));
+    if (n == -1 && errno != EAGAIN)
+        throw ErrnoException(QPID_MSG("Error clearing PollableCondition"));
     return wasSet;
 }
 
-void PollableCondition::set() {
-    static const char dummy=0;
-    ssize_t n = ::write(writeFd, &dummy, 1);
-    if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting 
PollableCondition");
+void PollableCondition::disarm() {
+    impl->unwatch();
+}
+
+void PollableCondition::rearm() {
+    impl->rewatch();
 }
 
 
@@ -71,22 +149,35 @@
 namespace qpid {
 namespace sys {
 
-PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
+PollableConditionPrivate::PollableConditionPrivate(const 
PollableCondition::Callback& cb,
+                                                   sys::PollableCondition& 
parent,
+                                                   const 
boost::shared_ptr<sys::Poller>& poller)
+  : cb(cb), parent(parent), poller(poller),
+    IOHandle(new sys::IOHandlePrivate) {
     impl->fd = ::eventfd(0, 0);
     if (impl->fd < 0) throw ErrnoException("conditionfd() failed");
 }
 
+void PollableCondition::set() {
+    static const uint64_t value=1;
+    ssize_t n = ::write(impl->impl->fd,
+                        reinterpret_cast<const void*>(&value), 8);
+    if (n != 8) throw ErrnoException("write failed on conditionfd");
+}
+
 bool PollableCondition::clear() {
     char buf[8];
-    ssize_t n = ::read(impl->fd, buf, 8);
+    ssize_t n = ::read(impl->impl->fd, buf, 8);
     if (n != 8) throw ErrnoException("read failed on conditionfd");
     return *reinterpret_cast<uint64_t*>(buf);
 }
 
-void PollableCondition::set() {
-    static const uint64_t value=1;
-    ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8);
-    if (n != 8) throw ErrnoException("write failed on conditionfd");
+void PollableCondition::disarm() {
+  // ????
+}
+
+void PollableCondition::rearm() {
+  // ????
 }
     
 #endif

Modified: qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp?rev=746061&r1=746060&r2=746061&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp Fri Feb 20 00:04:37 2009
@@ -27,6 +27,7 @@
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/client/QueueOptions.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Dispatcher.h"
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to