svn commit: r802819 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/ management/common/src/main/java/org/apache/qpid/management/common/mbeans/ management/eclipse-plugin/s

2009-08-10 Thread robbie
Author: robbie
Date: Mon Aug 10 15:01:01 2009
New Revision: 802819

URL: http://svn.apache.org/viewvc?rev=802819&view=rev
Log:
QPID-2018: Updated AMQQueueMBean to make use of the AMQQueue clearQueue return 
value to report the number of messages deleted. Updated management console 
accordingly, also indicating that it is only non-acquired messaes that are 
cleared

Modified:

qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java

qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java

qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=802819&r1=802818&r2=802819&view=diff
==
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Mon Aug 10 15:01:01 2009
@@ -307,13 +307,16 @@
 }
 
 /**
+ * Clears the queue of non-acquired messages
+ * 
+ * @return the number of messages deleted
  * @see AMQQueue#clearQueue
  */
-public void clearQueue() throws JMException
+public Long clearQueue() throws JMException
 {
 try
 {
-_queue.clearQueue(_storeContext);
+return _queue.clearQueue(_storeContext);
 }
 catch (AMQException ex)
 {

Modified: 
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java?rev=802819&r1=802818&r2=802819&view=diff
==
--- 
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
 Mon Aug 10 15:01:01 2009
@@ -291,14 +291,18 @@
 void deleteMessageFromTop() throws IOException, JMException;
 
 /**
- * Clears the queue by deleting all the undelivered messages from the 
queue.
+ * Clears the queue by deleting all the messages from the queue that have 
not been acquired by consumers"
+ * 
+ * Since Qpid JMX API 1.3 this returns the number of messages deleted. 
Prior to this, the return type was void.
+ * @return the number of messages deleted
  * @throws IOException
  * @throws JMException
  */
 @MBeanOperation(name="clearQueue",
-description="Clears the queue by deleting all the 
undelivered messages from the queue",
+description="Clears the queue by deleting all the messages 
from the queue " +
+   "that have not been acquired by consumers",
 impact= MBeanOperationInfo.ACTION)
-void clearQueue() throws IOException, JMException;
+Long clearQueue() throws IOException, JMException;
 
 /**
  * Moves the messages in given range of message Ids to given Queue. 
QPID-170

Modified: 
qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java?rev=802819&r1=802818&r2=802819&view=diff
==
--- 
qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java
 (original)
+++ 
qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java
 Mon Aug 10 15:01:01 2009
@@ -457,8 +457,21 @@
 {
 try
 {
-_qmb.clearQueue();
-ViewUtility.operationResultFeedback(null, "Queue 
cleared", null);
+if(_ApiVersion.greaterThanOrEqualTo(1, 3))
+{
+//Qpid JMX API 1.3+, returns the number of 
messages deleted
+Long numDeleted = _qmb.clearQueue();
+String message = "Queue cleared of " + numDeleted 
+ 
+ " non-acquired message" + 
(numDeleted == 1 ? "": "s");
+  

svn commit: r802927 - in /qpid/trunk: .gitignore qpid/cpp/src/Makefile.am qpid/cpp/src/cluster.mk qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp qpid/cpp/src/qpid/cluster/qpidd_watchdog.cpp qpid/cpp/src

2009-08-10 Thread aconway
Author: aconway
Date: Mon Aug 10 21:10:53 2009
New Revision: 802927

URL: http://svn.apache.org/viewvc?rev=802927&view=rev
Log:
Watchdog feature to remove unresponsive cluster nodes.

In some intstances (e.g. while resolving an error) it's possible for a
hung process to hang the entire cluster as they wait for its response.
The cluster can handle terminated processes but hung processes present
a problem.

If the watchdog plugin is loaded and --watchdog-interval is set then
the broker forks a child process that runs a very simple watchdog
program, and starts a timer in the broker process to signal the watchdog
every interval/2 seconds. The watchdog kills its parent if it does not
receive a signal for interval seconds. This allows a stuck broker to be 
removed from the cluster so other cluster members can continue.

Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/qpidd_watchdog.cpp
qpid/trunk/qpid/cpp/src/tests/test_watchdog   (with props)
Modified:
qpid/trunk/.gitignore
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: qpid/trunk/.gitignore
URL: 
http://svn.apache.org/viewvc/qpid/trunk/.gitignore?rev=802927&r1=802926&r2=802927&view=diff
==
--- qpid/trunk/.gitignore (original)
+++ qpid/trunk/.gitignore Mon Aug 10 21:10:53 2009
@@ -20,7 +20,7 @@
 qpid/cpp/libtool
 qpidc.spec
 qpid/cpp/src/gen/
-*.mk
+*gen.mk
 *.timestamp
 rgen.timestamp
 *.pcl

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=802927&r1=802926&r2=802927&view=diff
==
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Aug 10 21:10:53 2009
@@ -115,6 +115,7 @@
 # Destination for intalled programs and tests defined here
 #
 qpidexecdir = $(libexecdir)/qpid
+AM_CXXFLAGS += -DQPID_EXEC_DIR=\"$(qpidexecdir)\"
 qpidexec_PROGRAMS =
 qpidexec_SCRIPTS =
 qpidtestdir = $(qpidexecdir)/tests

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=802927&r1=802926&r2=802927&view=diff
==
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Aug 10 21:10:53 2009
@@ -89,4 +89,13 @@
 cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
 cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
 
+# The watchdog plugin and helper executable
+dmodule_LTLIBRARIES += watchdog.la
+watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp
+watchdog_la_LIBADD = libqpidbroker.la
+watchdog_la_LDFLAGS = $(PLUGINLDFLAGS)
+
+qpidexec_PROGRAMS += qpidd_watchdog
+qpidd_watchdog_SOURCES = qpid/cluster/qpidd_watchdog.cpp
+
 endif  # HAVE_LIBCPG

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp?rev=802927&view=auto
==
--- qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp Mon Aug 10 21:10:53 
2009
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/sys/Timer.h"
+#include "qpid/sys/Fork.h"
+#include 
+#include 
+#include 
+
+namespace qpid {
+namespace cluster {
+
+using broker::Broker;
+
+struct Settings {
+Settings() : interval(0) {}
+int interval;
+};
+
+struct WatchDogOptions : public qpid::Options {
+Settings& settings;
+
+WatchDogOptions(Settings& s) : settings(s) {
+addOptions()
+("watchdog-interval", optValue(settings.interval, "N"),
+ "broker is automatically killed if it is hung for more than \
+ N seconds. 0 disables watchdog.");
+}
+};
+
+struct WatchDogTask : public sys::TimerTask {
+int pid;
+sys::Timer& timer;
+int interval;
+
+WatchDogTask(int pid_, sys::Timer& t, int _interval)
+: TimerTask(_interval*sys::T

svn commit: r802990 - in /qpid/trunk/qpid/cpp/src/qpid/sys: DispatchHandle.cpp epoll/EpollPoller.cpp

2009-08-10 Thread astitcher
Author: astitcher
Date: Tue Aug 11 05:34:59 2009
New Revision: 802990

URL: http://svn.apache.org/viewvc?rev=802990&view=rev
Log:
Fix for re-entering DispatchHandle::processEvent more than once on disconnection

Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=802990&r1=802989&r2=802990&view=diff
==
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Aug 11 05:34:59 2009
@@ -284,10 +284,7 @@
 readableCallback(*this);
 writableCallback(*this);
 break;
-case Poller::DISCONNECTED: {
-ScopedLock lock(stateLock);
-poller->unmonitorHandle(*this, Poller::INOUT);
-}
+case Poller::DISCONNECTED:
 if (disconnectedCallback) {
 disconnectedCallback(*this);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=802990&r1=802989&r2=802990&view=diff
==
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Aug 11 05:34:59 
2009
@@ -575,6 +575,8 @@
 // (just not writable), allow us to readable until we get here 
again
 if (epe.events & ::EPOLLHUP) {
 if (eh.isHungup()) {
+// Don't set up last Handle so that we don't reset 
this handle
+// when we get back in here
 return Event(handle, DISCONNECTED);
 }
 eh.setHungup();



-
Apache Qpid - AMQP Messaging Implementation
Project:  http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org



svn commit: r802991 - /qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp

2009-08-10 Thread astitcher
Author: astitcher
Date: Tue Aug 11 05:35:05 2009
New Revision: 802991

URL: http://svn.apache.org/viewvc?rev=802991&view=rev
Log:
Ensure that a PollerHandle that has had a disconnected event can never
be returned with any more events

Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=802991&r1=802990&r2=802991&view=diff
==
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Aug 11 05:35:05 
2009
@@ -575,8 +575,11 @@
 // (just not writable), allow us to readable until we get here 
again
 if (epe.events & ::EPOLLHUP) {
 if (eh.isHungup()) {
+eh.setInactive();
 // Don't set up last Handle so that we don't reset 
this handle
-// when we get back in here
+// on re-entering Poller::wait. This means that we 
will never
+// be set active again once we've returned 
disconnected, and so
+// can never be returned again.
 return Event(handle, DISCONNECTED);
 }
 eh.setHungup();



-
Apache Qpid - AMQP Messaging Implementation
Project:  http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org



svn commit: r802992 - /qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp

2009-08-10 Thread astitcher
Author: astitcher
Date: Tue Aug 11 05:35:10 2009
New Revision: 802992

URL: http://svn.apache.org/viewvc?rev=802992&view=rev
Log:
Add tests to check for correct Poller DISCONNECT behaviour

Modified:
qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=802992&r1=802991&r2=802992&view=diff
==
--- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Tue Aug 11 05:35:10 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,7 +50,7 @@
 int lastWrite = ::write(fd, s.c_str(), s.size());
 if ( lastWrite >= 0) {
 bytesWritten += lastWrite;
-} 
+}
 } while (errno != EAGAIN);
 return bytesWritten;
 }
@@ -58,32 +58,32 @@
 int readALot(int fd) {
 int bytesRead = 0;
 char buf[1024];
-
+
 do {
 errno = 0;
 int lastRead = ::read(fd, buf, sizeof(buf));
 if ( lastRead >= 0) {
 bytesRead += lastRead;
-} 
+}
 } while (errno != EAGAIN);
 return bytesRead;
 }
 
 int main(int /*argc*/, char** /*argv*/)
 {
-try 
+try
 {
 int sv[2];
 int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
 assert(rc >= 0);
-
+
 // Set non-blocking
 rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
 assert(rc >= 0);
 
 rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
 assert(rc >= 0);
-
+
 // Make up a large string
 string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
 for (int i = 0; i < 6; i++)
@@ -97,32 +97,32 @@
 // Write as much as we can to socket 0
 int bytesWritten = writeALot(sv[0], testString);
 cout << "Wrote(0): " << bytesWritten << " bytes\n";
-
+
 // Read as much as we can from socket 1
 bytesRead = readALot(sv[1]);
 assert(bytesRead == bytesWritten);
 cout << "Read(1): " << bytesRead << " bytes\n";
 
 auto_ptr poller(new Poller);
-
+
 PosixIOHandle f0(sv[0]);
 PosixIOHandle f1(sv[1]);
 
 PollerHandle h0(f0);
 PollerHandle h1(f1);
-
+
 poller->registerHandle(h0);
 poller->monitorHandle(h0, Poller::INOUT);
-
+
 // h0 should be writable
 Poller::Event event = poller->wait();
 assert(event.handle == &h0);
 assert(event.type == Poller::WRITABLE);
-
+
 // Write as much as we can to socket 0
 bytesWritten = writeALot(sv[0], testString);
 cout << "Wrote(0): " << bytesWritten << " bytes\n";
-
+
 // Wait for 500ms - h0 no longer writable
 event = poller->wait(5);
 assert(event.handle == 0);
@@ -133,7 +133,7 @@
 event = poller->wait();
 assert(event.handle == &h1);
 assert(event.type == Poller::READ_WRITABLE);
-
+
 bytesRead = readALot(sv[1]);
 assert(bytesRead == bytesWritten);
 cout << "Read(1): " << bytesRead << " bytes\n";
@@ -147,11 +147,11 @@
 // Test multiple interrupts
 assert(poller->interrupt(h0) == true);
 assert(poller->interrupt(h1) == true);
-
+
 // Make sure we can interrupt them again
 assert(poller->interrupt(h0) == true);
 assert(poller->interrupt(h1) == true);
-
+
 // Make sure that they both come out
 event = poller->wait();
 assert(event.type == Poller::INTERRUPTED);
@@ -170,25 +170,44 @@
 
 event = poller->wait();
 assert(event.handle == &h0);
-assert(event.type == Poller::WRITABLE);
+assert(event.type == Poller::WRITABLE);
 
 // We didn't write anything so it should still be writable
 event = poller->wait();
 assert(event.handle == &h0);
-assert(event.type == Poller::WRITABLE);
+assert(event.type == Poller::WRITABLE);
 
 poller->unmonitorHandle(h0, Poller::INOUT);
 
 event = poller->wait(5);
 assert(event.handle == 0);
-
+
 poller->unregisterHandle(h1);
+assert(poller->interrupt(h1) == false);
+
+// close the other end to force a disconnect
+::close(sv[1]);
+
+// Now make sure that we are readable followed by disconnected
+// and after that we never return again
+poller

svn commit: r802993 - /qpid/trunk/qpid/cpp/src/tests/Makefile.am

2009-08-10 Thread astitcher
Author: astitcher
Date: Tue Aug 11 05:35:14 2009
New Revision: 802993

URL: http://svn.apache.org/viewvc?rev=802993&view=rev
Log:
Reinstate reliable_replication_test

Modified:
qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=802993&r1=802992&r2=802993&view=diff
==
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Aug 11 05:35:14 2009
@@ -317,9 +317,8 @@
 # Not run under valgrind, too slow
 
 LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest 
topic_perftest run_ring_queue_test stop_broker \
- run_failover_soak  \
+ run_failover_soak reliable_replication_test \
  federated_cluster_test_with_node_failure
-# TODO: renable the temporarily disabled the failing reliable_replication_test 
when QPID-1984 is resolved.
 
 EXTRA_DIST+=fanout_perftest shared_perftest multiq_perftest topic_perftest 
run_failover_soak reliable_replication_test \
   federated_cluster_test_with_node_failure \



-
Apache Qpid - AMQP Messaging Implementation
Project:  http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org