Author: aconway
Date: Tue Dec  6 15:56:40 2011
New Revision: 1210989

URL: http://svn.apache.org/viewvc?rev=1210989&view=rev
Log:
QPID-3652: Fix cluster authentication.

Only allow brokers that authenticate as the cluster-username to join a cluster.

New broker first connects to  a cluster broker authenticates as the 
cluster-username
and sends its CPG member ID to the qpid.cluster-credentials exchange.
The cluster broker that subsequently acts as updater verifies that the 
credentials are
valid before connecting to give the update.

NOTE 1: If you are using an ACL, the cluster-username must be allowed to
publish to the qpid.cluster-credentials exchange. E.g. in your ACL file:

acl allow foo@QPID publish exchange name=qpid.cluster-credentials

NOTE 2: This changes the cluster initialization protocol, you will
need to restart the cluster with all new version brokers.

Added:
    qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/UrlArray.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
    qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/rubygen/amqpgen.rb (original)
+++ qpid/trunk/qpid/cpp/rubygen/amqpgen.rb Tue Dec  6 15:56:40 2011
@@ -191,6 +191,7 @@ class AmqpElement
     "command-fragments" => "session.command-fragment",
     "in-doubt" => "dtx.xid",
     "tx-publish" => "str-8",
+    "urls" => "str-16",
     "queues" => "str-8",
     "prepared" => "str-8"
   }

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Dec  6 15:56:40 2011
@@ -360,6 +360,8 @@ libqpidcommon_la_SOURCES +=                 \
   qpid/StringUtils.cpp                         \
   qpid/StringUtils.h                           \
   qpid/Url.cpp                                 \
+  qpid/UrlArray.cpp                            \
+  qpid/UrlArray.h                              \
   qpid/Version.h                               \
   qpid/amqp_0_10/Exception.h                   \
   qpid/amqp_0_10/SessionHandler.cpp            \

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Dec  6 15:56:40 2011
@@ -55,6 +55,8 @@ cluster_la_SOURCES =                          \
   qpid/cluster/ConnectionCodec.h               \
   qpid/cluster/Cpg.cpp                         \
   qpid/cluster/Cpg.h                           \
+  qpid/cluster/CredentialsExchange.cpp         \
+  qpid/cluster/CredentialsExchange.h           \
   qpid/cluster/Dispatchable.h                  \
   qpid/cluster/UpdateClient.cpp                        \
   qpid/cluster/UpdateClient.h                  \

Added: qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp?rev=1210989&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp Tue Dec  6 15:56:40 2011
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "UrlArray.h"
+
+namespace qpid {
+
+std::vector<Url> urlArrayToVector(const framing::Array& array) {
+    std::vector<Url> urls;
+    for (framing::Array::ValueVector::const_iterator i = array.begin();
+         i != array.end();
+         ++i )
+        urls.push_back(Url((*i)->get<std::string>()));
+    return urls;
+}
+
+framing::Array vectorToUrlArray(const std::vector<Url>& urls) {
+    framing::Array array(0x95);
+    for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); 
++i)
+        array.add(boost::shared_ptr<framing::Str16Value>(new 
framing::Str16Value(i->str())));
+    return array;
+}
+
+} // namespace qpid

Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/UrlArray.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/UrlArray.h?rev=1210989&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/UrlArray.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/UrlArray.h Tue Dec  6 15:56:40 2011
@@ -0,0 +1,37 @@
+#ifndef QPID_CLUSTER_URLARRAY_H
+#define QPID_CLUSTER_URLARRAY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Array.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/Url.h"
+#include <vector>
+
+namespace qpid {
+
+/** @file Functions to encode/decode an array of URLs. */
+std::vector<Url> urlArrayToVector(const framing::Array& array);
+framing::Array vectorToUrlArray(const std::vector<Url>& urls);
+} // namespace qpid
+
+#endif  /*!QPID_CLUSTER_URLARRAY_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Tue Dec  6 15:56:40 
2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,7 +49,8 @@ class ConnectionState : public Connectio
         userProxyAuth(false), // Can proxy msgs with non-matching auth ids 
when true (used by federation links & clustering)
         federationLink(true),
         clientSupportsThrottling(false),
-        clusterOrderOut(0)
+        clusterOrderOut(0),
+        isDefaultRealm(false)
     {}
 
     virtual ~ConnectionState () {}
@@ -62,7 +63,15 @@ class ConnectionState : public Connectio
     void setHeartbeat(uint16_t hb) { heartbeat = hb; }
     void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
 
-    virtual void setUserId(const std::string& uid) {  userId = uid; }
+    virtual void setUserId(const std::string& uid) {
+        userId = uid;
+        size_t at = userId.find('@');
+        userName = userId.substr(0, at);
+        isDefaultRealm = (
+            at!= std::string::npos &&
+            getBroker().getOptions().realm == 
userId.substr(at+1,userId.size()));
+    }
+
     const std::string& getUserId() const { return userId; }
 
     void setUrl(const std::string& _url) { url = _url; }
@@ -75,7 +84,14 @@ class ConnectionState : public Connectio
     void setFederationPeerTag(const std::string& tag) { federationPeerTag = 
std::string(tag); }
     const std::string& getFederationPeerTag() const { return 
federationPeerTag; }
     std::vector<Url>& getKnownHosts() { return knownHosts; }
-    
+
+    /**@return true if user is the authenticated user on this connection.
+     * If id has the default realm will also compare plain username.
+     */
+    bool isAuthenticatedUser(const std::string& id) const {
+        return (id == userId || (isDefaultRealm && id == userName));
+    }
+
     void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
     bool getClientThrottling() const { return clientSupportsThrottling; }
 
@@ -114,6 +130,8 @@ class ConnectionState : public Connectio
     std::vector<Url> knownHosts;
     bool clientSupportsThrottling;
     framing::FrameHandler* clusterOrderOut;
+    std::string userName;
+    bool isDefaultRealm;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Dec  6 15:56:40 
2011
@@ -72,8 +72,6 @@ SemanticState::SemanticState(DeliveryAda
       dtxSelected(false),
       authMsg(getSession().getBroker().getOptions().auth && 
!getSession().getConnection().isUserProxyAuth()),
       userID(getSession().getConnection().getUserId()),
-      
userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
-      isDefaultRealm(userID.find('@') != std::string::npos && 
getSession().getBroker().getOptions().realm == 
userID.substr(userID.find('@')+1,userID.size())),
       closeComplete(false)
 {}
 
@@ -467,7 +465,7 @@ void SemanticState::route(intrusive_ptr<
     /* verify the userid if specified: */
     std::string id =
        msg->hasProperties<MessageProperties>() ? 
msg->getProperties<MessageProperties>()->getUserId() : nullstring;
-    if (authMsg &&  !id.empty() && !(id == userID || (isDefaultRealm && id == 
userName)))
+    if (authMsg &&  !id.empty() && 
!session.getConnection().isAuthenticatedUser(id))
     {
         QPID_LOG(debug, "authorised user id : " << userID << " but user id in 
message declared as " << id);
         throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << 
userID << " but user id in message declared as " << id));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Dec  6 15:56:40 2011
@@ -164,8 +164,6 @@ class SemanticState : private boost::non
     boost::shared_ptr<Exchange> cacheExchange;
     const bool authMsg;
     const std::string userID;
-    const std::string userName;
-    const bool isDefaultRealm;
     bool closeComplete;
 
     void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Tue Dec  6 
15:56:40 2011
@@ -23,6 +23,7 @@
 #include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include "qpid/log/Helpers.h"
+#include "qpid/UrlArray.h"
 
 namespace qpid {
 namespace client {
@@ -83,14 +84,9 @@ std::vector<Url> FailoverListener::getKn
 }
 
 std::vector<Url> FailoverListener::getKnownBrokers(const Message& msg) {
-    std::vector<Url> knownBrokers;
     framing::Array urlArray;
     msg.getHeaders().getArray("amq.failover", urlArray);
-    for (framing::Array::ValueVector::const_iterator i = urlArray.begin();
-         i != urlArray.end();
-         ++i ) 
-        knownBrokers.push_back(Url((*i)->get<std::string>()));
-    return knownBrokers;
+    return urlArrayToVector(urlArray);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Dec  6 15:56:40 2011
@@ -130,6 +130,7 @@
 #include "qpid/cluster/UpdateDataExchange.h"
 #include "qpid/cluster/UpdateExchange.h"
 #include "qpid/cluster/ClusterTimer.h"
+#include "qpid/cluster/CredentialsExchange.h"
 
 #include "qpid/assert.h"
 #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -162,6 +163,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Helpers.h"
 #include "qpid/log/Statement.h"
+#include "qpid/UrlArray.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/memory.h"
 #include "qpid/sys/Thread.h"
@@ -189,6 +191,7 @@ using management::ManagementObject;
 using management::Manageable;
 using management::Args;
 namespace _qmf = ::qmf::org::apache::qpid::cluster;
+namespace arg=client::arg;
 
 /**
  * NOTE: must increment this number whenever any incompatible changes in
@@ -199,7 +202,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1159330;
+const uint32_t Cluster::CLUSTER_VERSION = 1207877;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -211,12 +214,12 @@ struct ClusterDispatcher : public framin
 
     void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
                        uint8_t storeState, const Uuid& shutdownId,
-                       const std::string& firstConfig)
+                       const std::string& firstConfig, const framing::Array& 
urls)
     {
         cluster.initialStatus(
             member, version, active, clusterId,
             framing::cluster::StoreState(storeState), shutdownId,
-            firstConfig, l);
+            firstConfig, urls, l);
     }
     void ready(const std::string& url) {
         cluster.ready(member, url, l);
@@ -267,6 +270,7 @@ Cluster::Cluster(const ClusterSettings& 
                       poller),
     failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
     updateDataExchange(new UpdateDataExchange(*this)),
+    credentialsExchange(new CredentialsExchange(*this)),
     quorum(boost::bind(&Cluster::leave, this)),
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
@@ -300,6 +304,9 @@ Cluster::Cluster(const ClusterSettings& 
     // for single control frame.
     broker.getExchanges().registerExchange(updateDataExchange);
 
+    // CredentialsExchange is used to authenticate new cluster members
+    broker.getExchanges().registerExchange(credentialsExchange);
+
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
@@ -661,6 +668,7 @@ void Cluster::initMapCompleted(Lock& l) 
         setClusterId(initMap.getClusterId(), l);
 
         if (initMap.isUpdateNeeded())  { // Joining established cluster.
+            authenticate();
             broker.setRecovery(false); // Ditch my current store.
             broker.setClusterUpdatee(true);
             if (mAgent) mAgent->suppress(true); // Suppress mgmt output during 
update.
@@ -711,7 +719,8 @@ void Cluster::configChange(const MemberI
             ClusterInitialStatusBody(
                 ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
                 store.getState(), store.getShutdownId(),
-                initMap.getFirstConfigStr()
+                initMap.getFirstConfigStr(),
+                vectorToUrlArray(getUrls(l))
             ),
             self);
     }
@@ -803,6 +812,7 @@ void Cluster::initialStatus(const Member
                             framing::cluster::StoreState store,
                             const framing::Uuid& shutdownId,
                             const std::string& firstConfig,
+                            const framing::Array& urls,
                             Lock& l)
 {
     if (version != CLUSTER_VERSION) {
@@ -816,7 +826,7 @@ void Cluster::initialStatus(const Member
     initMap.received(
         member,
         ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
-                                 store, shutdownId, firstConfig)
+                                 store, shutdownId, firstConfig, urls)
     );
     if (initMap.transitionToComplete()) initMapCompleted(l);
 }
@@ -903,6 +913,11 @@ void Cluster::retractOffer(const MemberI
 }
 
 void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+    // Check for credentials if authentication is enabled.
+    if (broker.getOptions().auth && !credentialsExchange->check(updatee)) {
+        QPID_LOG(error, "Un-authenticated attempt to join the cluster");
+        return;
+    }
     // NOTE: deliverEventQueue is already stopped at the stall point by 
deliveredEvent.
     if (state == LEFT) return;
     assert(state == OFFER);
@@ -1115,6 +1130,35 @@ void Cluster::updateMgmtMembership(Lock&
     mgmtObject->set_memberIDs(idstr);
 }
 
+namespace {
+template <class T> struct AutoClose {
+    T closeme;
+    AutoClose(T t) : closeme(t) {}
+    ~AutoClose() { closeme.close(); }
+};
+}
+
+// Updatee connects to established member and stores credentials
+// in the qpid.cluster-credentials exchange to prove it
+// is safe for updater to connect and give an update.
+void Cluster::authenticate() {
+    if (!broker.getOptions().auth) return;
+    std::vector<Url> urls = initMap.getUrls();
+    for (std::vector<Url>::iterator i = urls.begin(); i != urls.end(); ++i) {
+        if (!i->empty()) {
+            client::Connection c;
+            c.open(*i, connectionSettings(settings));
+            AutoClose<client::Connection> closeConnection(c);
+            client::Session s = c.newSession(CredentialsExchange::NAME);
+            AutoClose<client::Session> closeSession(s);
+            client::Message credentials;
+            credentials.getHeaders().setUInt64(CredentialsExchange::NAME, 
getId());
+            s.messageTransfer(arg::content=credentials, 
arg::destination=CredentialsExchange::NAME);
+            s.sync();
+        }
+    }
+}
+
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
     static const char* STATE[] = {
         "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Dec  6 15:56:40 2011
@@ -77,6 +77,7 @@ class Connection;
 struct EventFrame;
 class ClusterTimer;
 class UpdateDataExchange;
+class CredentialsExchange;
 
 /**
  * Connection to the cluster
@@ -187,6 +188,7 @@ class Cluster : private Cpg::Handler, pu
                        framing::cluster::StoreState,
                        const framing::Uuid& shutdownId,
                        const std::string& firstConfig,
+                       const framing::Array& urls,
                        Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&,
@@ -215,6 +217,7 @@ class Cluster : private Cpg::Handler, pu
     void becomeElder(Lock&);
     void setMgmtStatus(Lock&);
     void updateMgmtMembership(Lock&);
+    void authenticate();
 
     // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback.
@@ -271,6 +274,7 @@ class Cluster : private Cpg::Handler, pu
     PollableFrameQueue deliverFrameQueue;
     boost::shared_ptr<FailoverExchange> failoverExchange;
     boost::shared_ptr<UpdateDataExchange> updateDataExchange;
+    boost::shared_ptr<CredentialsExchange> credentialsExchange;
     Quorum quorum;
     LockedConnectionMap localConnections;
 

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp?rev=1210989&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp Tue Dec  6 
15:56:40 2011
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "CredentialsExchange.h"
+#include "Cluster.h"
+#include "qpid/broker/ConnectionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+
+const string CredentialsExchange::NAME=("qpid.cluster-credentials");
+
+namespace {
+const string ANONYMOUS_MECH("ANONYMOUS");
+const string ANONYMOUS_USER("anonymous");
+
+string effectiveUserId(const string& username, const string& mechanism) {
+    if (mechanism == ANONYMOUS_MECH && username.empty())
+        return ANONYMOUS_USER;
+    else
+        return username;
+}
+}
+
+CredentialsExchange::CredentialsExchange(Cluster& cluster)
+    : broker::Exchange(NAME, &cluster),
+      username(effectiveUserId(cluster.getSettings().username,
+                               cluster.getSettings().mechanism)),
+      timeout(120*sys::TIME_SEC),
+      authenticate(cluster.getBroker().getOptions().auth)
+{}
+
+static const string anonymous("anonymous");
+
+bool CredentialsExchange::check(MemberId member) {
+    sys::Mutex::ScopedLock l(lock);
+    Map::iterator i = map.find(member);
+    if (i == map.end()) return false;
+    bool valid = (sys::Duration(i->second, sys::AbsTime::now()) < timeout);
+    map.erase(i);
+    return valid;
+}
+
+void CredentialsExchange::route(broker::Deliverable& msg, const string& 
/*routingKey*/, const framing::FieldTable* args) {
+    sys::Mutex::ScopedLock l(lock);
+    const broker::ConnectionState* connection =
+        static_cast<const 
broker::ConnectionState*>(msg.getMessage().getPublisher());
+    if (authenticate && !connection->isAuthenticatedUser(username))
+        throw framing::UnauthorizedAccessException(
+            QPID_MSG("Unauthorized user " << connection->getUserId() << " for 
" << NAME
+                     << ", should be " << username));
+    if (!args || !args->isSet(NAME))
+        throw framing::InvalidArgumentException(
+            QPID_MSG("Invalid message received by " << NAME));
+    MemberId member(args->getAsUInt64(NAME));
+    map[member] = sys::AbsTime::now();
+}
+
+string CredentialsExchange::getType() const { return NAME; }
+
+namespace {
+void throwIllegal() {
+    throw framing::NotAllowedException(
+        QPID_MSG("Illegal use of " << CredentialsExchange::NAME+" exchange"));
+}
+}
+
+bool CredentialsExchange::bind(boost::shared_ptr<broker::Queue> , const 
string& /*routingKey*/, const framing::FieldTable* ) { throwIllegal(); return 
false; }
+bool CredentialsExchange::unbind(boost::shared_ptr<broker::Queue> , const 
string& /*routingKey*/, const framing::FieldTable* ) { throwIllegal(); return 
false; }
+bool CredentialsExchange::isBound(boost::shared_ptr<broker::Queue>, const 
string* const /*routingKey*/, const framing::FieldTable* const ) { 
throwIllegal(); return false; }
+
+
+}} // Namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h?rev=1210989&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h Tue Dec  6 
15:56:40 2011
@@ -0,0 +1,72 @@
+#ifndef QPID_CLUSTER_CREDENTIALSEXCHANGE_H
+#define QPID_CLUSTER_CREDENTIALSEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include <qpid/broker/Exchange.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
+#include <string>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+class Cluster;
+
+/**
+ * New members joining the cluster send their identity information to this
+ * exchange to prove they are authenticated as the cluster user.
+ * The exchange rejects messages that are not properly authenticated
+ */
+class CredentialsExchange : public broker::Exchange
+{
+  public:
+    static const std::string NAME;
+
+    CredentialsExchange(Cluster&);
+
+    /** Check if this member has credentials. The credentials are deleted. */
+    bool check(MemberId member);
+
+    /** Throw an exception if the calling connection is not the cluster user. 
Store credentials in msg. */
+    void route(broker::Deliverable& msg, const std::string& routingKey, const 
framing::FieldTable* args);
+
+    // Exchange overrides
+    std::string getType() const;
+    bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& 
routingKey, const framing::FieldTable* args);
+    bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& 
routingKey, const framing::FieldTable* args);
+    bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* 
const routingKey, const framing::FieldTable* const args);
+
+  private:
+    typedef std::map<MemberId, sys::AbsTime> Map;
+    sys::Mutex lock;
+    Map map;
+    std::string username;
+    sys::Duration timeout;
+    bool authenticate;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CREDENTIALSEXCHANGE_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp Tue Dec  6 
15:56:40 2011
@@ -28,6 +28,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/Array.h"
+#include "qpid/UrlArray.h"
 #include <boost/bind.hpp>
 #include <algorithm>
 
@@ -86,9 +87,7 @@ void FailoverExchange::route(Deliverable
 void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
     // Called with lock held.
     if (urls.empty()) return;
-    framing::Array array(0x95);
-    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
-        array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
+    framing::Array array = vectorToUrlArray(urls);
     const ProtocolVersion v;
     boost::intrusive_ptr<Message> msg(new Message);
     AMQFrame command(MessageTransferBody(v, typeName, 1, 0));

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Tue Dec  6 
15:56:40 2011
@@ -21,6 +21,7 @@
 #include "InitialStatusMap.h"
 #include "StoreStatus.h"
 #include "qpid/log/Statement.h"
+#include "qpid/UrlArray.h"
 #include <algorithm>
 #include <vector>
 #include <boost/bind.hpp>
@@ -218,6 +219,17 @@ void InitialStatusMap::checkConsistent()
     }
 }
 
+std::vector<Url> InitialStatusMap::getUrls() const {
+    std::vector<Url> urls;
+    for (Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+        if (i->second) {
+            std::vector<Url> urls = urlArrayToVector(i->second->getUrls());
+            if (!urls.empty()) return urls;
+        }
+    }
+    return std::vector<Url>();
+}
+
 std::string InitialStatusMap::getFirstConfigStr() const {
     assert(!firstConfig.empty());
     return encodeMemberSet(firstConfig);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Tue Dec  6 15:56:40 
2011
@@ -23,8 +23,10 @@
  */
 
 #include "MemberSet.h"
+#include "qpid/Url.h"
 #include <qpid/framing/ClusterInitialStatusBody.h>
 #include <boost/optional.hpp>
+#include <vector>
 
 namespace qpid {
 namespace cluster {
@@ -69,6 +71,8 @@ class InitialStatusMap
     framing::Uuid getClusterId();
     /**@pre isComplete(). @throw Exception if there are any inconsistencies. */
     void checkConsistent();
+    /*@return cluster URLs */
+    std::vector<Url> getUrls() const;
 
     /** Get first config-change for this member, encoded as a string.
      *@pre configChange has been called at least once.

Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Tue Dec  6 15:56:40 2011
@@ -36,21 +36,25 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTes
 
 typedef InitialStatusMap::Status Status;
 
-Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
+Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(),
+                    const framing::Array& urls=framing::Array())
+{
     return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(),
-                  encodeMemberSet(ms));
+                  encodeMemberSet(ms), urls);
 }
 
-Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
+Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(),
+                      const framing::Array& urls=framing::Array())
+{
     return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, 
Uuid(),
-                  encodeMemberSet(ms));
+                  encodeMemberSet(ms), urls);
 }
 
 Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid 
stop=Uuid(),
-                   const MemberSet& ms=MemberSet())
+                   const MemberSet& ms=MemberSet(), const framing::Array& 
urls=framing::Array())
 {
-    return Status(ProtocolVersion(), 0, active, start, state, stop, 
-                  encodeMemberSet(ms));
+    return Status(ProtocolVersion(), 0, active, start, state, stop,
+                  encodeMemberSet(ms), urls);
 }
 
 QPID_AUTO_TEST_CASE(testFirstInCluster) {

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Dec  6 15:56:40 2011
@@ -363,16 +363,20 @@ class Broker(Popen):
 
     def host_port(self): return "%s:%s" % (self.host(), self.port())
 
+    def log_contains(self, str, timeout=1):
+        """Wait for str to appear in the log file up to timeout. Return true 
if found"""
+        return retry(lambda: find_in_file(str, self.log), timeout)
+
     def log_ready(self):
         """Return true if the log file exists and contains a broker ready 
message"""
         if not self._log_ready:
             self._log_ready = find_in_file("notice Broker running", self.log)
         return self._log_ready
 
-    def ready(self, **kwargs):
+    def ready(self, timeout=5, **kwargs):
         """Wait till broker is ready to serve clients"""
         # First make sure the broker is listening by checking the log.
-        if not retry(self.log_ready, timeout=60):
+        if not retry(self.log_ready, timeout=timeout):
             raise Exception(
                 "Timed out waiting for broker %s%s"%(self.name, 
error_line(self.log,5)))
         # Create a connection and a session. For a cluster broker this will

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp Tue Dec  6 
15:56:40 2011
@@ -96,7 +96,7 @@ startBroker ( brokerVector & brokers , i
     argv.push_back (clusterArg.str());
     argv.push_back ("--cluster-username=zig");
     argv.push_back ("--cluster-password=zig");
-    argv.push_back ("--cluster-mechanism=ANONYMOUS");
+    argv.push_back ("--cluster-mechanism=PLAIN");
     argv.push_back ("--sasl-config=./sasl_config");
     argv.push_back ("--auth=yes");
     argv.push_back ("--mgmt-enable=yes");

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Dec  6 15:56:40 2011
@@ -114,7 +114,9 @@ class ShortTests(BrokerTest):
         sasl_config=os.path.join(self.rootdir, "sasl_config")
         acl=os.path.join(os.getcwd(), "policy.acl")
         aclf=file(acl,"w")
+        # Must allow cluster-user (zag) access to credentials exchange.
         aclf.write("""
+acl allow zag@QPID publish exchange name=qpid.cluster-credentials
 acl allow zig@QPID all all
 acl deny all all
 """)
@@ -122,7 +124,11 @@ acl deny all all
         cluster = self.cluster(1, args=["--auth", "yes",
                                         "--sasl-config", sasl_config,
                                         "--load-module", os.getenv("ACL_LIB"),
-                                        "--acl-file", acl])
+                                        "--acl-file", acl,
+                                        "--cluster-username=zag",
+                                        "--cluster-password=zag",
+                                        "--cluster-mechanism=PLAIN"
+                                        ])
 
         # Valid user/password, ensure queue is created.
         c = cluster[0].connect(username="zig", password="zig")
@@ -167,39 +173,51 @@ acl deny all all
             self.fail("Expected exception")
         except messaging.exceptions.NotFound: pass
 
-    def test_sasl_join(self):
+    def test_sasl_join_good(self):
         """Verify SASL authentication between brokers when joining a 
cluster."""
         sasl_config=os.path.join(self.rootdir, "sasl_config")
         # Test with a valid username/password
         cluster = self.cluster(1, args=["--auth", "yes",
                                         "--sasl-config", sasl_config,
-                                        "--load-module", os.getenv("ACL_LIB"),
                                         "--cluster-username=zig",
                                         "--cluster-password=zig",
                                         "--cluster-mechanism=PLAIN"
                                         ])
         cluster.start()
-        cluster.ready()
-        c = cluster[1].connect(username="zag", password="zag")
+        c = cluster[1].connect(username="zag", password="zag", 
mechanism="PLAIN")
 
-        # Test with an invalid username/password
+    def test_sasl_join_bad_password(self):
+        # Test with an invalid password
         cluster = self.cluster(1, args=["--auth", "yes",
-                                        "--sasl-config", sasl_config,
-                                        "--load-module", os.getenv("ACL_LIB"),
-                                        "--cluster-username=x",
-                                        "--cluster-password=y",
+                                        "--sasl-config", 
os.path.join(self.rootdir, "sasl_config"),
+                                        "--cluster-username=zig",
+                                        "--cluster-password=bad",
                                         "--cluster-mechanism=PLAIN"
                                         ])
-        try:
-            cluster.start(expect=EXPECT_EXIT_OK)
-            cluster[1].ready()
-            self.fail("Expected exception")
-        except: pass
+        cluster.start(wait=False, expect=EXPECT_EXIT_FAIL)
+        assert cluster[1].log_contains("critical Unexpected error: 
connection-forced: Authentication failed")
+
+    def test_sasl_join_wrong_user(self):
+        # Test with a valid user that is not the cluster user.
+        cluster = self.cluster(0, args=["--auth", "yes",
+                                        "--sasl-config", 
os.path.join(self.rootdir, "sasl_config")])
+        cluster.start(args=["--cluster-username=zig",
+                            "--cluster-password=zig",
+                            "--cluster-mechanism=PLAIN"
+                            ])
+
+        cluster.start(wait=False, expect=EXPECT_EXIT_FAIL,
+                      args=["--cluster-username=zag",
+                            "--cluster-password=zag",
+                            "--cluster-mechanism=PLAIN"
+                            ])
+        assert cluster[1].log_contains("critical Unexpected error: 
unauthorized-access: unauthorized-access: Unauthorized user zag@QPID for 
qpid.cluster-credentials, should be zig")
 
     def test_user_id_update(self):
         """Ensure that user-id of an open session is updated to new cluster 
members"""
         sasl_config=os.path.join(self.rootdir, "sasl_config")
-        cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", 
sasl_config,])
+        cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", 
sasl_config,
+                                        "--cluster-mechanism=ANONYMOUS"])
         c = cluster[0].connect(username="zig", password="zig")
         s = c.session().sender("q;{create:always}")
         s.send(Message("x", user_id="zig")) # Message sent before start new 
broker

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1210989&r1=1210988&r2=1210989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Dec  6 15:56:40 2011
@@ -65,6 +65,7 @@
       <field name="store-state" type="store-state"/>
       <field name="shutdown-id" type="uuid"/>
       <field name="first-config" type="str16"/>
+      <field name="urls" type="array"/>               <!-- Array of str16 -->
     </control>
 
     <!-- New member or updater is ready as an active member. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to