svn commit: r802819 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/ management/common/src/main/java/org/apache/qpid/management/common/mbeans/ management/eclipse-plugin/s
Author: robbie Date: Mon Aug 10 15:01:01 2009 New Revision: 802819 URL: http://svn.apache.org/viewvc?rev=802819&view=rev Log: QPID-2018: Updated AMQQueueMBean to make use of the AMQQueue clearQueue return value to report the number of messages deleted. Updated management console accordingly, also indicating that it is only non-acquired messaes that are cleared Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=802819&r1=802818&r2=802819&view=diff == --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Aug 10 15:01:01 2009 @@ -307,13 +307,16 @@ } /** + * Clears the queue of non-acquired messages + * + * @return the number of messages deleted * @see AMQQueue#clearQueue */ -public void clearQueue() throws JMException +public Long clearQueue() throws JMException { try { -_queue.clearQueue(_storeContext); +return _queue.clearQueue(_storeContext); } catch (AMQException ex) { Modified: qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java?rev=802819&r1=802818&r2=802819&view=diff == --- qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java (original) +++ qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java Mon Aug 10 15:01:01 2009 @@ -291,14 +291,18 @@ void deleteMessageFromTop() throws IOException, JMException; /** - * Clears the queue by deleting all the undelivered messages from the queue. + * Clears the queue by deleting all the messages from the queue that have not been acquired by consumers" + * + * Since Qpid JMX API 1.3 this returns the number of messages deleted. Prior to this, the return type was void. + * @return the number of messages deleted * @throws IOException * @throws JMException */ @MBeanOperation(name="clearQueue", -description="Clears the queue by deleting all the undelivered messages from the queue", +description="Clears the queue by deleting all the messages from the queue " + + "that have not been acquired by consumers", impact= MBeanOperationInfo.ACTION) -void clearQueue() throws IOException, JMException; +Long clearQueue() throws IOException, JMException; /** * Moves the messages in given range of message Ids to given Queue. QPID-170 Modified: qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java?rev=802819&r1=802818&r2=802819&view=diff == --- qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java (original) +++ qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java Mon Aug 10 15:01:01 2009 @@ -457,8 +457,21 @@ { try { -_qmb.clearQueue(); -ViewUtility.operationResultFeedback(null, "Queue cleared", null); +if(_ApiVersion.greaterThanOrEqualTo(1, 3)) +{ +//Qpid JMX API 1.3+, returns the number of messages deleted +Long numDeleted = _qmb.clearQueue(); +String message = "Queue cleared of " + numDeleted + + " non-acquired message" + (numDeleted == 1 ? "": "s"); +
svn commit: r802927 - in /qpid/trunk: .gitignore qpid/cpp/src/Makefile.am qpid/cpp/src/cluster.mk qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp qpid/cpp/src/qpid/cluster/qpidd_watchdog.cpp qpid/cpp/src
Author: aconway Date: Mon Aug 10 21:10:53 2009 New Revision: 802927 URL: http://svn.apache.org/viewvc?rev=802927&view=rev Log: Watchdog feature to remove unresponsive cluster nodes. In some intstances (e.g. while resolving an error) it's possible for a hung process to hang the entire cluster as they wait for its response. The cluster can handle terminated processes but hung processes present a problem. If the watchdog plugin is loaded and --watchdog-interval is set then the broker forks a child process that runs a very simple watchdog program, and starts a timer in the broker process to signal the watchdog every interval/2 seconds. The watchdog kills its parent if it does not receive a signal for interval seconds. This allows a stuck broker to be removed from the cluster so other cluster members can continue. Added: qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/qpidd_watchdog.cpp qpid/trunk/qpid/cpp/src/tests/test_watchdog (with props) Modified: qpid/trunk/.gitignore qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/tests/cluster.mk Modified: qpid/trunk/.gitignore URL: http://svn.apache.org/viewvc/qpid/trunk/.gitignore?rev=802927&r1=802926&r2=802927&view=diff == --- qpid/trunk/.gitignore (original) +++ qpid/trunk/.gitignore Mon Aug 10 21:10:53 2009 @@ -20,7 +20,7 @@ qpid/cpp/libtool qpidc.spec qpid/cpp/src/gen/ -*.mk +*gen.mk *.timestamp rgen.timestamp *.pcl Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=802927&r1=802926&r2=802927&view=diff == --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Aug 10 21:10:53 2009 @@ -115,6 +115,7 @@ # Destination for intalled programs and tests defined here # qpidexecdir = $(libexecdir)/qpid +AM_CXXFLAGS += -DQPID_EXEC_DIR=\"$(qpidexecdir)\" qpidexec_PROGRAMS = qpidexec_SCRIPTS = qpidtestdir = $(qpidexecdir)/tests Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=802927&r1=802926&r2=802927&view=diff == --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Aug 10 21:10:53 2009 @@ -89,4 +89,13 @@ cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing cluster_la_LDFLAGS = $(PLUGINLDFLAGS) +# The watchdog plugin and helper executable +dmodule_LTLIBRARIES += watchdog.la +watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp +watchdog_la_LIBADD = libqpidbroker.la +watchdog_la_LDFLAGS = $(PLUGINLDFLAGS) + +qpidexec_PROGRAMS += qpidd_watchdog +qpidd_watchdog_SOURCES = qpid/cluster/qpidd_watchdog.cpp + endif # HAVE_LIBCPG Added: qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp?rev=802927&view=auto == --- qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/WatchDogPlugin.cpp Mon Aug 10 21:10:53 2009 @@ -0,0 +1,114 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/log/Statement.h" +#include "qpid/broker/Broker.h" +#include "qpid/sys/Timer.h" +#include "qpid/sys/Fork.h" +#include +#include +#include + +namespace qpid { +namespace cluster { + +using broker::Broker; + +struct Settings { +Settings() : interval(0) {} +int interval; +}; + +struct WatchDogOptions : public qpid::Options { +Settings& settings; + +WatchDogOptions(Settings& s) : settings(s) { +addOptions() +("watchdog-interval", optValue(settings.interval, "N"), + "broker is automatically killed if it is hung for more than \ + N seconds. 0 disables watchdog."); +} +}; + +struct WatchDogTask : public sys::TimerTask { +int pid; +sys::Timer& timer; +int interval; + +WatchDogTask(int pid_, sys::Timer& t, int _interval) +: TimerTask(_interval*sys::T
svn commit: r802990 - in /qpid/trunk/qpid/cpp/src/qpid/sys: DispatchHandle.cpp epoll/EpollPoller.cpp
Author: astitcher Date: Tue Aug 11 05:34:59 2009 New Revision: 802990 URL: http://svn.apache.org/viewvc?rev=802990&view=rev Log: Fix for re-entering DispatchHandle::processEvent more than once on disconnection Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=802990&r1=802989&r2=802990&view=diff == --- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Aug 11 05:34:59 2009 @@ -284,10 +284,7 @@ readableCallback(*this); writableCallback(*this); break; -case Poller::DISCONNECTED: { -ScopedLock lock(stateLock); -poller->unmonitorHandle(*this, Poller::INOUT); -} +case Poller::DISCONNECTED: if (disconnectedCallback) { disconnectedCallback(*this); } Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=802990&r1=802989&r2=802990&view=diff == --- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Aug 11 05:34:59 2009 @@ -575,6 +575,8 @@ // (just not writable), allow us to readable until we get here again if (epe.events & ::EPOLLHUP) { if (eh.isHungup()) { +// Don't set up last Handle so that we don't reset this handle +// when we get back in here return Event(handle, DISCONNECTED); } eh.setHungup(); - Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org
svn commit: r802991 - /qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
Author: astitcher Date: Tue Aug 11 05:35:05 2009 New Revision: 802991 URL: http://svn.apache.org/viewvc?rev=802991&view=rev Log: Ensure that a PollerHandle that has had a disconnected event can never be returned with any more events Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=802991&r1=802990&r2=802991&view=diff == --- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Aug 11 05:35:05 2009 @@ -575,8 +575,11 @@ // (just not writable), allow us to readable until we get here again if (epe.events & ::EPOLLHUP) { if (eh.isHungup()) { +eh.setInactive(); // Don't set up last Handle so that we don't reset this handle -// when we get back in here +// on re-entering Poller::wait. This means that we will never +// be set active again once we've returned disconnected, and so +// can never be returned again. return Event(handle, DISCONNECTED); } eh.setHungup(); - Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org
svn commit: r802992 - /qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
Author: astitcher Date: Tue Aug 11 05:35:10 2009 New Revision: 802992 URL: http://svn.apache.org/viewvc?rev=802992&view=rev Log: Add tests to check for correct Poller DISCONNECT behaviour Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=802992&r1=802991&r2=802992&view=diff == --- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Tue Aug 11 05:35:10 2009 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,7 +50,7 @@ int lastWrite = ::write(fd, s.c_str(), s.size()); if ( lastWrite >= 0) { bytesWritten += lastWrite; -} +} } while (errno != EAGAIN); return bytesWritten; } @@ -58,32 +58,32 @@ int readALot(int fd) { int bytesRead = 0; char buf[1024]; - + do { errno = 0; int lastRead = ::read(fd, buf, sizeof(buf)); if ( lastRead >= 0) { bytesRead += lastRead; -} +} } while (errno != EAGAIN); return bytesRead; } int main(int /*argc*/, char** /*argv*/) { -try +try { int sv[2]; int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); assert(rc >= 0); - + // Set non-blocking rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); assert(rc >= 0); rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); assert(rc >= 0); - + // Make up a large string string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;"; for (int i = 0; i < 6; i++) @@ -97,32 +97,32 @@ // Write as much as we can to socket 0 int bytesWritten = writeALot(sv[0], testString); cout << "Wrote(0): " << bytesWritten << " bytes\n"; - + // Read as much as we can from socket 1 bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); cout << "Read(1): " << bytesRead << " bytes\n"; auto_ptr poller(new Poller); - + PosixIOHandle f0(sv[0]); PosixIOHandle f1(sv[1]); PollerHandle h0(f0); PollerHandle h1(f1); - + poller->registerHandle(h0); poller->monitorHandle(h0, Poller::INOUT); - + // h0 should be writable Poller::Event event = poller->wait(); assert(event.handle == &h0); assert(event.type == Poller::WRITABLE); - + // Write as much as we can to socket 0 bytesWritten = writeALot(sv[0], testString); cout << "Wrote(0): " << bytesWritten << " bytes\n"; - + // Wait for 500ms - h0 no longer writable event = poller->wait(5); assert(event.handle == 0); @@ -133,7 +133,7 @@ event = poller->wait(); assert(event.handle == &h1); assert(event.type == Poller::READ_WRITABLE); - + bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); cout << "Read(1): " << bytesRead << " bytes\n"; @@ -147,11 +147,11 @@ // Test multiple interrupts assert(poller->interrupt(h0) == true); assert(poller->interrupt(h1) == true); - + // Make sure we can interrupt them again assert(poller->interrupt(h0) == true); assert(poller->interrupt(h1) == true); - + // Make sure that they both come out event = poller->wait(); assert(event.type == Poller::INTERRUPTED); @@ -170,25 +170,44 @@ event = poller->wait(); assert(event.handle == &h0); -assert(event.type == Poller::WRITABLE); +assert(event.type == Poller::WRITABLE); // We didn't write anything so it should still be writable event = poller->wait(); assert(event.handle == &h0); -assert(event.type == Poller::WRITABLE); +assert(event.type == Poller::WRITABLE); poller->unmonitorHandle(h0, Poller::INOUT); event = poller->wait(5); assert(event.handle == 0); - + poller->unregisterHandle(h1); +assert(poller->interrupt(h1) == false); + +// close the other end to force a disconnect +::close(sv[1]); + +// Now make sure that we are readable followed by disconnected +// and after that we never return again +poller
svn commit: r802993 - /qpid/trunk/qpid/cpp/src/tests/Makefile.am
Author: astitcher Date: Tue Aug 11 05:35:14 2009 New Revision: 802993 URL: http://svn.apache.org/viewvc?rev=802993&view=rev Log: Reinstate reliable_replication_test Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=802993&r1=802992&r2=802993&view=diff == --- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Aug 11 05:35:14 2009 @@ -317,9 +317,8 @@ # Not run under valgrind, too slow LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \ - run_failover_soak \ + run_failover_soak reliable_replication_test \ federated_cluster_test_with_node_failure -# TODO: renable the temporarily disabled the failing reliable_replication_test when QPID-1984 is resolved. EXTRA_DIST+=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test \ federated_cluster_test_with_node_failure \ - Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org