svn commit: r893175 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/Connection.cpp src/qpid/cluster/Connection.h src/qpid/cluster/UpdateClient.cpp src/tests/cluster_test.cpp xml/cluster.xml
Author: aconway Date: Tue Dec 22 14:22:17 2009 New Revision: 893175 URL: http://svn.apache.org/viewvc?rev=893175view=rev Log: QPID-2296: Cluster errors when using acquire-mode-not-acquired Replicate consumer's queue position to new cluster nodes. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=893175r1=893174r2=893175view=diff == --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Dec 22 14:22:17 2009 @@ -269,9 +269,10 @@ return sessionState().getSemanticState(); } -void Connection::consumerState(const string name, bool blocked, bool notifyEnabled) +void Connection::consumerState(const string name, bool blocked, bool notifyEnabled, const SequenceNumber position) { broker::SemanticState::ConsumerImpl c = semanticState().find(name); +c.position = position; c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); consumerNumbering.add(c.shared_from_this()); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=893175r1=893174r2=893175view=diff == --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Dec 22 14:22:17 2009 @@ -108,7 +108,7 @@ // Called for data delivered from the cluster. void deliveredFrame(const EventFrame); -void consumerState(const std::string name, bool blocked, bool notifyEnabled); +void consumerState(const std::string name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber position); // Used in catch-up mode to build initial state. // Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=893175r1=893174r2=893175view=diff == --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Dec 22 14:22:17 2009 @@ -381,7 +381,8 @@ ClusterConnectionProxy(shadowSession).consumerState( ci-getName(), ci-isBlocked(), -ci-isNotifyEnabled() +ci-isNotifyEnabled(), +ci-position ); consumerNumbering.add(ci); Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=893175r1=893174r2=893175view=diff == --- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Dec 22 14:22:17 2009 @@ -1149,6 +1149,43 @@ BOOST_CHECK(!browseByteCredit(c1, q, size-1, m)); } +// Test that consumer positions are updated correctly. +// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927 +// +QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { +ClusterFixture::Args args; +prepareArgs(args, durableFlag); +ClusterFixture cluster(1, args, -1); +Client c0(cluster[0], c0); + +c0.session.queueDeclare(q, arg::durable=durableFlag); +SubscriptionSettings settings; +settings.autoAck = 0; +// Set the acquire mode to 'not-acquired' the consumer moves along the queue +// but does not acquire (remove) messages. +settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED; +Subscription s = c0.subs.subscribe(c0.lq, q, settings); +c0.session.messageTransfer(arg::content=makeMessage(1, q, durableFlag)); +BOOST_CHECK_EQUAL(1, c0.lq.get(TIMEOUT).getData()); + +// Add another member, send/receive another message and acquire +// the messages. With the bug, this creates an inconsistency +// because the browse position was not updated to the new member. +cluster.add(); +c0.session.messageTransfer(arg::content=makeMessage(2, q, durableFlag)); +BOOST_CHECK_EQUAL(2, c0.lq.get(TIMEOUT).getData()); +s.acquire(s.getUnacquired()); +s.accept(s.getUnaccepted()); + +// In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1] +// Subscribing on cluster[1] provokes an error that shuts down cluster[0] +Client c1(cluster[1], c1); +Subscription s1 = c1.subs.subscribe(c1.lq, q); // Default auto-ack=1 +Message m; +
[CONF] Apache Qpid QMF Map Message Protocol
QMF Map Message Protocol Page edited by Ted Ross QMFv2 Map Message Protocol Introduction This document describes the design of a proposed protocol for QMF based on map-messages (offered by the new C++ and Python APIs as well as the existing JMS API). If adopted, this new protocol will change the formats of the messages used by QMF components to communicate. It will also change some of the message exchange patterns. It will not significantly impact the console and agent APIs and is intended to operate with applications that use the current QMF APIs. Some highlights of the new design: Current QMF message bodies are in packed binary formats. While quite efficient, this style of formatting makes it difficult to make changes to the format and content for new features. The proposed format is based on encoded maps (a.k.a. dictionaries, field-tables) which are very easily extended and require less context to be useful. QMF currently requires the message broker to participate in the QMF protocol. The proposed protocol removes this requirement and will run properly on any AMQP message broker. QMF Agents currently publish periodic updates of their managed content to a globally accessible topic. This has security implications with regard to access to data. This is also inflexible in that updates to all data are sent at the same intervals. The proposed protocol removes the global publishing of data and introduces a subscription-query whereby a console may request that an agent publish certain data at a certain interval to an indicated target. Such requests can be subject to access control and may be focused on only the data that is needed for a particular application. The proposed protocol allows for more general use of data. For example: Free-form data, that has no object-identifier nor schema, can be transferred. This is useful for complex queries (joins, reports, etc.). Methods can be invoked against an agent in the absence of a managed object. QMF Protocol Use of Message Headers Standard Headers Application Headers Message Type Codes Type Code Name Description agent Agent Indication Indication sent from an agent periodically or immediately upon receiving an agent-locate with matching criteria. This message is used in agent discovery and as a keep-alive/heartbeat indication. agent-locate Agent Locate Indication Indication sent from a console when the console wishes to discover agents in the network. This message may contain a selector indicating which subset of all agents it wishes to discover. response Response General-purpose response to a request. A response message is always correlated to a specific request and is sent to the reply-to of the request. A response may optionally contain a structured exception or it may optionally contain a list of values (i.e. output arguments from a method call). schema-query Schema Query Request A request sent by a console to an agent to discover information about the schemata of data offered by the agent. A schema query may request a list of packages (i.e. namespaces), a list of schema keys, or full schema details from the agent. schema Schema Indication Indication sent from an agent either to announce newly added schema information or as a side effect of having received a schema query request from a console. get-query Get Query Request A request sent from a console to an agent to get data from the agent. Generally, a get query is used to access managed objects owned by the agent. This message is extensible such that an agent may offer advanced query capabilities (SQL, Reporting, etc.) to consoles that know how to use the capabilities. managed-object Managed Object Indication Indication sent from an agent as a side effect of having received a get-query or a create-subscription. Managed objects have object-identifiers, schema keys, and create/destroy timestamps in addition to their attributes. object Object Indication Indication sent from an agent as a side effect of having received a get-query. Unmanaged objects have schema keys but no object-identifiers or timestamps as they are not managed, can't be addressed, and have no lifecycle. data Data Indication Indication sent from an agent as a side effect of having received a get-query. Data has attributes only and can be used for any general purpose. event Event Indication Indication sent from an agent when the agent application raises an event. create-subscription Create Subscription Request Request sent from a console to an agent to create a subscription query. Such a query behaves like a get-query except that once the matching objects are sent, the query remains open and sends updates when matching objects are changed, added, or deleted. A subscription query has a target (not necessarily the address of the requesting console), a publishing interval to control the frequency of updates, and a
svn commit: r893301 - in /qpid/trunk: ./ qpid/java/ qpid/java/broker/ qpid/java/broker/etc/log4j.xml
Author: robbie Date: Tue Dec 22 20:11:56 2009 New Revision: 893301 URL: http://svn.apache.org/viewvc?rev=893301view=rev Log: QPID-2155: remove stray character causing parser warnings during broker startup merge from r892761 Modified: qpid/trunk/ (props changed) qpid/trunk/qpid/java/ (props changed) qpid/trunk/qpid/java/broker/ (props changed) qpid/trunk/qpid/java/broker/etc/log4j.xml Propchange: qpid/trunk/ -- --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 22 20:11:56 2009 @@ -1 +1,2 @@ +/qpid/branches/0.5.x-dev:892761 /qpid/branches/java-network-refactor:805429-825319 Propchange: qpid/trunk/qpid/java/ -- --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 22 20:11:56 2009 @@ -1,5 +1,5 @@ /qpid/branches/0.5.x-dev:886720-886722 -/qpid/branches/0.5.x-dev/qpid/java:886720-886722,887145 +/qpid/branches/0.5.x-dev/qpid/java:886720-886722,887145,892761 /qpid/branches/java-broker-0-10/qpid/java:795950-829653 /qpid/branches/java-network-refactor/qpid/java:805429-821809 /qpid/trunk/qpid:796646-796653 Propchange: qpid/trunk/qpid/java/broker/ -- --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 22 20:11:56 2009 @@ -1,4 +1,4 @@ -/qpid/branches/0.5.x-dev/qpid/java/broker:886720-886722,887145 +/qpid/branches/0.5.x-dev/qpid/java/broker:886720-886722,887145,892761 /qpid/branches/java-broker-0-10/qpid/java/broker:795950-829653 /qpid/branches/java-network-refactor/qpid/java/broker:805429-821809 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599 Modified: qpid/trunk/qpid/java/broker/etc/log4j.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/etc/log4j.xml?rev=893301r1=893300r2=893301view=diff == --- qpid/trunk/qpid/java/broker/etc/log4j.xml (original) +++ qpid/trunk/qpid/java/broker/etc/log4j.xml Tue Dec 22 20:11:56 2009 @@ -58,7 +58,7 @@ layout class=org.apache.log4j.PatternLayout param name=ConversionPattern value=%d %-5p [%t] %C{2} (%F:%L) - %m%n/ -/layout` +/layout /appender appender class=org.apache.log4j.FileAppender name=FileAppender - Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org
svn commit: r893315 - in /qpid/trunk/qpid/dotnet/client-010: client/client/ client/transport/ client/transport/network/ client/transport/network/io/ test/ test/Helpers/ test/interop/
Author: aidan Date: Tue Dec 22 20:59:57 2009 New Revision: 893315 URL: http://svn.apache.org/viewvc?rev=893315view=rev Log: QPID-2239: make sure the close() does not hand. Also handle authentication failures properly, QPID-2240. Patch by julien.lavigne Added: qpid/trunk/qpid/dotnet/client-010/test/Helpers/ qpid/trunk/qpid/dotnet/client-010/test/Helpers/ConfigHelpers.cs qpid/trunk/qpid/dotnet/client-010/test/interop/ConnectionTests.cs Modified: qpid/trunk/qpid/dotnet/client-010/client/client/Client.cs qpid/trunk/qpid/dotnet/client-010/client/client/ClientConnectionDelegate.cs qpid/trunk/qpid/dotnet/client-010/client/client/IClient.cs qpid/trunk/qpid/dotnet/client-010/client/transport/Channel.cs qpid/trunk/qpid/dotnet/client-010/client/transport/ClientDelegate.cs qpid/trunk/qpid/dotnet/client-010/client/transport/Connection.cs qpid/trunk/qpid/dotnet/client-010/client/transport/ConnectionDelegate.cs qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSSLTransport.cs qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoTransport.cs qpid/trunk/qpid/dotnet/client-010/test/Test.csproj qpid/trunk/qpid/dotnet/client-010/test/interop/TestCase.cs Modified: qpid/trunk/qpid/dotnet/client-010/client/client/Client.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/client/Client.cs?rev=893315r1=893314r2=893315view=diff == --- qpid/trunk/qpid/dotnet/client-010/client/client/Client.cs (original) +++ qpid/trunk/qpid/dotnet/client-010/client/client/Client.cs Tue Dec 22 20:59:57 2009 @@ -34,11 +34,18 @@ private readonly Object _closeOK; private IClosedListener _closedListner; +public event EventHandlerExceptionArgs ExceptionRaised; +public event EventHandler ConnectionLost; public bool IsClosed { get { return _isClosed; } -set { _isClosed = value; } +set +{ +_isClosed = value; +if(_isClosed ConnectionLost != null) +ConnectionLost(this, EventArgs.Empty); +} } public Object CloseOk @@ -67,14 +74,20 @@ { _log.Debug(String.Format(Client Connecting to host {0}; port {1}; virtualHost {2}; username {3}, host, port, virtualHost, username)); -ConnectionDelegate connectionDelegate = new ClientConnectionDelegate(this, username, password); +ClientConnectionDelegate connectionDelegate = new ClientConnectionDelegate(this, username, password); ManualResetEvent negotiationComplete = new ManualResetEvent(false); -connectionDelegate.setCondition(negotiationComplete); +connectionDelegate.SetCondition(negotiationComplete); connectionDelegate.VirtualHost = virtualHost; _conn = IoTransport.Connect(host, port, connectionDelegate); _conn.Send(new ProtocolHeader(1, 0, 10)); negotiationComplete.WaitOne(); + +if (connectionDelegate.Exception != null) +throw connectionDelegate.Exception; + +connectionDelegate.SetCondition(null); + } /// summary @@ -93,15 +106,20 @@ { _log.Debug(String.Format(Client Connecting to host {0}; port {1}; virtualHost {2}; username {3}, host, port, virtualHost, username)); -_log.Debug(String.Format(SSL paramters: serverName: {0}; certPath: {1}; rejectUntrusted: {2}, serverName, certPath, rejectUntrusted)); -ConnectionDelegate connectionDelegate = new ClientConnectionDelegate(this, username, password); +_log.Debug(String.Format(SSL paramters: serverName: {0}; certPath: {1}; rejectUntrusted: {2}, serverName, certPath, rejectUntrusted)); +ClientConnectionDelegate connectionDelegate = new ClientConnectionDelegate(this, username, password); ManualResetEvent negotiationComplete = new ManualResetEvent(false); -connectionDelegate.setCondition(negotiationComplete); +connectionDelegate.SetCondition(negotiationComplete); connectionDelegate.VirtualHost = virtualHost; _conn = IoSSLTransport.Connect(host, port, serverName, certPath, rejectUntrusted, connectionDelegate); _conn.Send(new ProtocolHeader(1, 0, 10)); negotiationComplete.WaitOne(); + +if (connectionDelegate.Exception != null) +throw connectionDelegate.Exception; + +