Author: aconway Date: Tue Dec 6 15:56:40 2011 New Revision: 1210989 URL: http://svn.apache.org/viewvc?rev=1210989&view=rev Log: QPID-3652: Fix cluster authentication.
Only allow brokers that authenticate as the cluster-username to join a cluster. New broker first connects to a cluster broker authenticates as the cluster-username and sends its CPG member ID to the qpid.cluster-credentials exchange. The cluster broker that subsequently acts as updater verifies that the credentials are valid before connecting to give the update. NOTE 1: If you are using an ACL, the cluster-username must be allowed to publish to the qpid.cluster-credentials exchange. E.g. in your ACL file: acl allow foo@QPID publish exchange name=qpid.cluster-credentials NOTE 2: This changes the cluster initialization protocol, you will need to restart the cluster with all new version brokers. Added: qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp (with props) qpid/trunk/qpid/cpp/src/qpid/UrlArray.h (with props) qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp (with props) qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h (with props) Modified: qpid/trunk/qpid/cpp/rubygen/amqpgen.rb qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp qpid/trunk/qpid/cpp/src/tests/brokertest.py qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp qpid/trunk/qpid/cpp/src/tests/cluster_tests.py qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/rubygen/amqpgen.rb URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/rubygen/amqpgen.rb (original) +++ qpid/trunk/qpid/cpp/rubygen/amqpgen.rb Tue Dec 6 15:56:40 2011 @@ -191,6 +191,7 @@ class AmqpElement "command-fragments" => "session.command-fragment", "in-doubt" => "dtx.xid", "tx-publish" => "str-8", + "urls" => "str-16", "queues" => "str-8", "prepared" => "str-8" } Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Dec 6 15:56:40 2011 @@ -360,6 +360,8 @@ libqpidcommon_la_SOURCES += \ qpid/StringUtils.cpp \ qpid/StringUtils.h \ qpid/Url.cpp \ + qpid/UrlArray.cpp \ + qpid/UrlArray.h \ qpid/Version.h \ qpid/amqp_0_10/Exception.h \ qpid/amqp_0_10/SessionHandler.cpp \ Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Dec 6 15:56:40 2011 @@ -55,6 +55,8 @@ cluster_la_SOURCES = \ qpid/cluster/ConnectionCodec.h \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ + qpid/cluster/CredentialsExchange.cpp \ + qpid/cluster/CredentialsExchange.h \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ Added: qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp?rev=1210989&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp Tue Dec 6 15:56:40 2011 @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "UrlArray.h" + +namespace qpid { + +std::vector<Url> urlArrayToVector(const framing::Array& array) { + std::vector<Url> urls; + for (framing::Array::ValueVector::const_iterator i = array.begin(); + i != array.end(); + ++i ) + urls.push_back(Url((*i)->get<std::string>())); + return urls; +} + +framing::Array vectorToUrlArray(const std::vector<Url>& urls) { + framing::Array array(0x95); + for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) + array.add(boost::shared_ptr<framing::Str16Value>(new framing::Str16Value(i->str()))); + return array; +} + +} // namespace qpid Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/trunk/qpid/cpp/src/qpid/UrlArray.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/UrlArray.h?rev=1210989&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/UrlArray.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/UrlArray.h Tue Dec 6 15:56:40 2011 @@ -0,0 +1,37 @@ +#ifndef QPID_CLUSTER_URLARRAY_H +#define QPID_CLUSTER_URLARRAY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Array.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Url.h" +#include <vector> + +namespace qpid { + +/** @file Functions to encode/decode an array of URLs. */ +std::vector<Url> urlArrayToVector(const framing::Array& array); +framing::Array vectorToUrlArray(const std::vector<Url>& urls); +} // namespace qpid + +#endif /*!QPID_CLUSTER_URLARRAY_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/UrlArray.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Tue Dec 6 15:56:40 2011 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,7 +49,8 @@ class ConnectionState : public Connectio userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering) federationLink(true), clientSupportsThrottling(false), - clusterOrderOut(0) + clusterOrderOut(0), + isDefaultRealm(false) {} virtual ~ConnectionState () {} @@ -62,7 +63,15 @@ class ConnectionState : public Connectio void setHeartbeat(uint16_t hb) { heartbeat = hb; } void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } - virtual void setUserId(const std::string& uid) { userId = uid; } + virtual void setUserId(const std::string& uid) { + userId = uid; + size_t at = userId.find('@'); + userName = userId.substr(0, at); + isDefaultRealm = ( + at!= std::string::npos && + getBroker().getOptions().realm == userId.substr(at+1,userId.size())); + } + const std::string& getUserId() const { return userId; } void setUrl(const std::string& _url) { url = _url; } @@ -75,7 +84,14 @@ class ConnectionState : public Connectio void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); } const std::string& getFederationPeerTag() const { return federationPeerTag; } std::vector<Url>& getKnownHosts() { return knownHosts; } - + + /**@return true if user is the authenticated user on this connection. + * If id has the default realm will also compare plain username. + */ + bool isAuthenticatedUser(const std::string& id) const { + return (id == userId || (isDefaultRealm && id == userName)); + } + void setClientThrottling(bool set=true) { clientSupportsThrottling = set; } bool getClientThrottling() const { return clientSupportsThrottling; } @@ -114,6 +130,8 @@ class ConnectionState : public Connectio std::vector<Url> knownHosts; bool clientSupportsThrottling; framing::FrameHandler* clusterOrderOut; + std::string userName; + bool isDefaultRealm; }; }} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Dec 6 15:56:40 2011 @@ -72,8 +72,6 @@ SemanticState::SemanticState(DeliveryAda dtxSelected(false), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), userID(getSession().getConnection().getUserId()), - userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), - isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), closeComplete(false) {} @@ -467,7 +465,7 @@ void SemanticState::route(intrusive_ptr< /* verify the userid if specified: */ std::string id = msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; - if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) + if (authMsg && !id.empty() && !session.getConnection().isAuthenticatedUser(id)) { QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id)); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Dec 6 15:56:40 2011 @@ -164,8 +164,6 @@ class SemanticState : private boost::non boost::shared_ptr<Exchange> cacheExchange; const bool authMsg; const std::string userID; - const std::string userName; - const bool isDefaultRealm; bool closeComplete; void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); Modified: qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Tue Dec 6 15:56:40 2011 @@ -23,6 +23,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/log/Helpers.h" +#include "qpid/UrlArray.h" namespace qpid { namespace client { @@ -83,14 +84,9 @@ std::vector<Url> FailoverListener::getKn } std::vector<Url> FailoverListener::getKnownBrokers(const Message& msg) { - std::vector<Url> knownBrokers; framing::Array urlArray; msg.getHeaders().getArray("amq.failover", urlArray); - for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); - i != urlArray.end(); - ++i ) - knownBrokers.push_back(Url((*i)->get<std::string>())); - return knownBrokers; + return urlArrayToVector(urlArray); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Dec 6 15:56:40 2011 @@ -130,6 +130,7 @@ #include "qpid/cluster/UpdateDataExchange.h" #include "qpid/cluster/UpdateExchange.h" #include "qpid/cluster/ClusterTimer.h" +#include "qpid/cluster/CredentialsExchange.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -162,6 +163,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" +#include "qpid/UrlArray.h" #include "qpid/management/ManagementAgent.h" #include "qpid/memory.h" #include "qpid/sys/Thread.h" @@ -189,6 +191,7 @@ using management::ManagementObject; using management::Manageable; using management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; +namespace arg=client::arg; /** * NOTE: must increment this number whenever any incompatible changes in @@ -199,7 +202,7 @@ namespace _qmf = ::qmf::org::apache::qpi * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1159330; +const uint32_t Cluster::CLUSTER_VERSION = 1207877; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -211,12 +214,12 @@ struct ClusterDispatcher : public framin void initialStatus(uint32_t version, bool active, const Uuid& clusterId, uint8_t storeState, const Uuid& shutdownId, - const std::string& firstConfig) + const std::string& firstConfig, const framing::Array& urls) { cluster.initialStatus( member, version, active, clusterId, framing::cluster::StoreState(storeState), shutdownId, - firstConfig, l); + firstConfig, urls, l); } void ready(const std::string& url) { cluster.ready(member, url, l); @@ -267,6 +270,7 @@ Cluster::Cluster(const ClusterSettings& poller), failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), updateDataExchange(new UpdateDataExchange(*this)), + credentialsExchange(new CredentialsExchange(*this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -300,6 +304,9 @@ Cluster::Cluster(const ClusterSettings& // for single control frame. broker.getExchanges().registerExchange(updateDataExchange); + // CredentialsExchange is used to authenticate new cluster members + broker.getExchanges().registerExchange(credentialsExchange); + // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); @@ -661,6 +668,7 @@ void Cluster::initMapCompleted(Lock& l) setClusterId(initMap.getClusterId(), l); if (initMap.isUpdateNeeded()) { // Joining established cluster. + authenticate(); broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. @@ -711,7 +719,8 @@ void Cluster::configChange(const MemberI ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, store.getState(), store.getShutdownId(), - initMap.getFirstConfigStr() + initMap.getFirstConfigStr(), + vectorToUrlArray(getUrls(l)) ), self); } @@ -803,6 +812,7 @@ void Cluster::initialStatus(const Member framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, + const framing::Array& urls, Lock& l) { if (version != CLUSTER_VERSION) { @@ -816,7 +826,7 @@ void Cluster::initialStatus(const Member initMap.received( member, ClusterInitialStatusBody(ProtocolVersion(), version, active, id, - store, shutdownId, firstConfig) + store, shutdownId, firstConfig, urls) ); if (initMap.transitionToComplete()) initMapCompleted(l); } @@ -903,6 +913,11 @@ void Cluster::retractOffer(const MemberI } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { + // Check for credentials if authentication is enabled. + if (broker.getOptions().auth && !credentialsExchange->check(updatee)) { + QPID_LOG(error, "Un-authenticated attempt to join the cluster"); + return; + } // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. if (state == LEFT) return; assert(state == OFFER); @@ -1115,6 +1130,35 @@ void Cluster::updateMgmtMembership(Lock& mgmtObject->set_memberIDs(idstr); } +namespace { +template <class T> struct AutoClose { + T closeme; + AutoClose(T t) : closeme(t) {} + ~AutoClose() { closeme.close(); } +}; +} + +// Updatee connects to established member and stores credentials +// in the qpid.cluster-credentials exchange to prove it +// is safe for updater to connect and give an update. +void Cluster::authenticate() { + if (!broker.getOptions().auth) return; + std::vector<Url> urls = initMap.getUrls(); + for (std::vector<Url>::iterator i = urls.begin(); i != urls.end(); ++i) { + if (!i->empty()) { + client::Connection c; + c.open(*i, connectionSettings(settings)); + AutoClose<client::Connection> closeConnection(c); + client::Session s = c.newSession(CredentialsExchange::NAME); + AutoClose<client::Session> closeSession(s); + client::Message credentials; + credentials.getHeaders().setUInt64(CredentialsExchange::NAME, getId()); + s.messageTransfer(arg::content=credentials, arg::destination=CredentialsExchange::NAME); + s.sync(); + } + } +} + std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Dec 6 15:56:40 2011 @@ -77,6 +77,7 @@ class Connection; struct EventFrame; class ClusterTimer; class UpdateDataExchange; +class CredentialsExchange; /** * Connection to the cluster @@ -187,6 +188,7 @@ class Cluster : private Cpg::Handler, pu framing::cluster::StoreState, const framing::Uuid& shutdownId, const std::string& firstConfig, + const framing::Array& urls, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, @@ -215,6 +217,7 @@ class Cluster : private Cpg::Handler, pu void becomeElder(Lock&); void setMgmtStatus(Lock&); void updateMgmtMembership(Lock&); + void authenticate(); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. @@ -271,6 +274,7 @@ class Cluster : private Cpg::Handler, pu PollableFrameQueue deliverFrameQueue; boost::shared_ptr<FailoverExchange> failoverExchange; boost::shared_ptr<UpdateDataExchange> updateDataExchange; + boost::shared_ptr<CredentialsExchange> credentialsExchange; Quorum quorum; LockedConnectionMap localConnections; Added: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp?rev=1210989&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp Tue Dec 6 15:56:40 2011 @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "CredentialsExchange.h" +#include "Cluster.h" +#include "qpid/broker/ConnectionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace cluster { + +using namespace std; + +const string CredentialsExchange::NAME=("qpid.cluster-credentials"); + +namespace { +const string ANONYMOUS_MECH("ANONYMOUS"); +const string ANONYMOUS_USER("anonymous"); + +string effectiveUserId(const string& username, const string& mechanism) { + if (mechanism == ANONYMOUS_MECH && username.empty()) + return ANONYMOUS_USER; + else + return username; +} +} + +CredentialsExchange::CredentialsExchange(Cluster& cluster) + : broker::Exchange(NAME, &cluster), + username(effectiveUserId(cluster.getSettings().username, + cluster.getSettings().mechanism)), + timeout(120*sys::TIME_SEC), + authenticate(cluster.getBroker().getOptions().auth) +{} + +static const string anonymous("anonymous"); + +bool CredentialsExchange::check(MemberId member) { + sys::Mutex::ScopedLock l(lock); + Map::iterator i = map.find(member); + if (i == map.end()) return false; + bool valid = (sys::Duration(i->second, sys::AbsTime::now()) < timeout); + map.erase(i); + return valid; +} + +void CredentialsExchange::route(broker::Deliverable& msg, const string& /*routingKey*/, const framing::FieldTable* args) { + sys::Mutex::ScopedLock l(lock); + const broker::ConnectionState* connection = + static_cast<const broker::ConnectionState*>(msg.getMessage().getPublisher()); + if (authenticate && !connection->isAuthenticatedUser(username)) + throw framing::UnauthorizedAccessException( + QPID_MSG("Unauthorized user " << connection->getUserId() << " for " << NAME + << ", should be " << username)); + if (!args || !args->isSet(NAME)) + throw framing::InvalidArgumentException( + QPID_MSG("Invalid message received by " << NAME)); + MemberId member(args->getAsUInt64(NAME)); + map[member] = sys::AbsTime::now(); +} + +string CredentialsExchange::getType() const { return NAME; } + +namespace { +void throwIllegal() { + throw framing::NotAllowedException( + QPID_MSG("Illegal use of " << CredentialsExchange::NAME+" exchange")); +} +} + +bool CredentialsExchange::bind(boost::shared_ptr<broker::Queue> , const string& /*routingKey*/, const framing::FieldTable* ) { throwIllegal(); return false; } +bool CredentialsExchange::unbind(boost::shared_ptr<broker::Queue> , const string& /*routingKey*/, const framing::FieldTable* ) { throwIllegal(); return false; } +bool CredentialsExchange::isBound(boost::shared_ptr<broker::Queue>, const string* const /*routingKey*/, const framing::FieldTable* const ) { throwIllegal(); return false; } + + +}} // Namespace qpid::cluster Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h?rev=1210989&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h Tue Dec 6 15:56:40 2011 @@ -0,0 +1,72 @@ +#ifndef QPID_CLUSTER_CREDENTIALSEXCHANGE_H +#define QPID_CLUSTER_CREDENTIALSEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include <qpid/broker/Exchange.h> +#include <qpid/sys/Mutex.h> +#include <qpid/sys/Time.h> +#include <string> +#include <map> + +namespace qpid { +namespace cluster { + +class Cluster; + +/** + * New members joining the cluster send their identity information to this + * exchange to prove they are authenticated as the cluster user. + * The exchange rejects messages that are not properly authenticated + */ +class CredentialsExchange : public broker::Exchange +{ + public: + static const std::string NAME; + + CredentialsExchange(Cluster&); + + /** Check if this member has credentials. The credentials are deleted. */ + bool check(MemberId member); + + /** Throw an exception if the calling connection is not the cluster user. Store credentials in msg. */ + void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + + // Exchange overrides + std::string getType() const; + bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); + bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); + bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args); + + private: + typedef std::map<MemberId, sys::AbsTime> Map; + sys::Mutex lock; + Map map; + std::string username; + sys::Duration timeout; + bool authenticate; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CREDENTIALSEXCHANGE_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp Tue Dec 6 15:56:40 2011 @@ -28,6 +28,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/framing/Array.h" +#include "qpid/UrlArray.h" #include <boost/bind.hpp> #include <algorithm> @@ -86,9 +87,7 @@ void FailoverExchange::route(Deliverable void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { // Called with lock held. if (urls.empty()) return; - framing::Array array(0x95); - for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) - array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); + framing::Array array = vectorToUrlArray(urls); const ProtocolVersion v; boost::intrusive_ptr<Message> msg(new Message); AMQFrame command(MessageTransferBody(v, typeName, 1, 0)); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Tue Dec 6 15:56:40 2011 @@ -21,6 +21,7 @@ #include "InitialStatusMap.h" #include "StoreStatus.h" #include "qpid/log/Statement.h" +#include "qpid/UrlArray.h" #include <algorithm> #include <vector> #include <boost/bind.hpp> @@ -218,6 +219,17 @@ void InitialStatusMap::checkConsistent() } } +std::vector<Url> InitialStatusMap::getUrls() const { + std::vector<Url> urls; + for (Map::const_iterator i = map.begin(); i != map.end(); ++i) { + if (i->second) { + std::vector<Url> urls = urlArrayToVector(i->second->getUrls()); + if (!urls.empty()) return urls; + } + } + return std::vector<Url>(); +} + std::string InitialStatusMap::getFirstConfigStr() const { assert(!firstConfig.empty()); return encodeMemberSet(firstConfig); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Tue Dec 6 15:56:40 2011 @@ -23,8 +23,10 @@ */ #include "MemberSet.h" +#include "qpid/Url.h" #include <qpid/framing/ClusterInitialStatusBody.h> #include <boost/optional.hpp> +#include <vector> namespace qpid { namespace cluster { @@ -69,6 +71,8 @@ class InitialStatusMap framing::Uuid getClusterId(); /**@pre isComplete(). @throw Exception if there are any inconsistencies. */ void checkConsistent(); + /*@return cluster URLs */ + std::vector<Url> getUrls() const; /** Get first config-change for this member, encoded as a string. *@pre configChange has been called at least once. Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Tue Dec 6 15:56:40 2011 @@ -36,21 +36,25 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTes typedef InitialStatusMap::Status Status; -Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) { +Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(), + const framing::Array& urls=framing::Array()) +{ return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), - encodeMemberSet(ms)); + encodeMemberSet(ms), urls); } -Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) { +Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(), + const framing::Array& urls=framing::Array()) +{ return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), - encodeMemberSet(ms)); + encodeMemberSet(ms), urls); } Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid(), - const MemberSet& ms=MemberSet()) + const MemberSet& ms=MemberSet(), const framing::Array& urls=framing::Array()) { - return Status(ProtocolVersion(), 0, active, start, state, stop, - encodeMemberSet(ms)); + return Status(ProtocolVersion(), 0, active, start, state, stop, + encodeMemberSet(ms), urls); } QPID_AUTO_TEST_CASE(testFirstInCluster) { Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original) +++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Dec 6 15:56:40 2011 @@ -363,16 +363,20 @@ class Broker(Popen): def host_port(self): return "%s:%s" % (self.host(), self.port()) + def log_contains(self, str, timeout=1): + """Wait for str to appear in the log file up to timeout. Return true if found""" + return retry(lambda: find_in_file(str, self.log), timeout) + def log_ready(self): """Return true if the log file exists and contains a broker ready message""" if not self._log_ready: self._log_ready = find_in_file("notice Broker running", self.log) return self._log_ready - def ready(self, **kwargs): + def ready(self, timeout=5, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. - if not retry(self.log_ready, timeout=60): + if not retry(self.log_ready, timeout=timeout): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will Modified: qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_authentication_soak.cpp Tue Dec 6 15:56:40 2011 @@ -96,7 +96,7 @@ startBroker ( brokerVector & brokers , i argv.push_back (clusterArg.str()); argv.push_back ("--cluster-username=zig"); argv.push_back ("--cluster-password=zig"); - argv.push_back ("--cluster-mechanism=ANONYMOUS"); + argv.push_back ("--cluster-mechanism=PLAIN"); argv.push_back ("--sasl-config=./sasl_config"); argv.push_back ("--auth=yes"); argv.push_back ("--mgmt-enable=yes"); Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Dec 6 15:56:40 2011 @@ -114,7 +114,9 @@ class ShortTests(BrokerTest): sasl_config=os.path.join(self.rootdir, "sasl_config") acl=os.path.join(os.getcwd(), "policy.acl") aclf=file(acl,"w") + # Must allow cluster-user (zag) access to credentials exchange. aclf.write(""" +acl allow zag@QPID publish exchange name=qpid.cluster-credentials acl allow zig@QPID all all acl deny all all """) @@ -122,7 +124,11 @@ acl deny all all cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, "--load-module", os.getenv("ACL_LIB"), - "--acl-file", acl]) + "--acl-file", acl, + "--cluster-username=zag", + "--cluster-password=zag", + "--cluster-mechanism=PLAIN" + ]) # Valid user/password, ensure queue is created. c = cluster[0].connect(username="zig", password="zig") @@ -167,39 +173,51 @@ acl deny all all self.fail("Expected exception") except messaging.exceptions.NotFound: pass - def test_sasl_join(self): + def test_sasl_join_good(self): """Verify SASL authentication between brokers when joining a cluster.""" sasl_config=os.path.join(self.rootdir, "sasl_config") # Test with a valid username/password cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, - "--load-module", os.getenv("ACL_LIB"), "--cluster-username=zig", "--cluster-password=zig", "--cluster-mechanism=PLAIN" ]) cluster.start() - cluster.ready() - c = cluster[1].connect(username="zag", password="zag") + c = cluster[1].connect(username="zag", password="zag", mechanism="PLAIN") - # Test with an invalid username/password + def test_sasl_join_bad_password(self): + # Test with an invalid password cluster = self.cluster(1, args=["--auth", "yes", - "--sasl-config", sasl_config, - "--load-module", os.getenv("ACL_LIB"), - "--cluster-username=x", - "--cluster-password=y", + "--sasl-config", os.path.join(self.rootdir, "sasl_config"), + "--cluster-username=zig", + "--cluster-password=bad", "--cluster-mechanism=PLAIN" ]) - try: - cluster.start(expect=EXPECT_EXIT_OK) - cluster[1].ready() - self.fail("Expected exception") - except: pass + cluster.start(wait=False, expect=EXPECT_EXIT_FAIL) + assert cluster[1].log_contains("critical Unexpected error: connection-forced: Authentication failed") + + def test_sasl_join_wrong_user(self): + # Test with a valid user that is not the cluster user. + cluster = self.cluster(0, args=["--auth", "yes", + "--sasl-config", os.path.join(self.rootdir, "sasl_config")]) + cluster.start(args=["--cluster-username=zig", + "--cluster-password=zig", + "--cluster-mechanism=PLAIN" + ]) + + cluster.start(wait=False, expect=EXPECT_EXIT_FAIL, + args=["--cluster-username=zag", + "--cluster-password=zag", + "--cluster-mechanism=PLAIN" + ]) + assert cluster[1].log_contains("critical Unexpected error: unauthorized-access: unauthorized-access: Unauthorized user zag@QPID for qpid.cluster-credentials, should be zig") def test_user_id_update(self): """Ensure that user-id of an open session is updated to new cluster members""" sasl_config=os.path.join(self.rootdir, "sasl_config") - cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,]) + cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, + "--cluster-mechanism=ANONYMOUS"]) c = cluster[0].connect(username="zig", password="zig") s = c.session().sender("q;{create:always}") s.send(Message("x", user_id="zig")) # Message sent before start new broker Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1210989&r1=1210988&r2=1210989&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Dec 6 15:56:40 2011 @@ -65,6 +65,7 @@ <field name="store-state" type="store-state"/> <field name="shutdown-id" type="uuid"/> <field name="first-config" type="str16"/> + <field name="urls" type="array"/> <!-- Array of str16 --> </control> <!-- New member or updater is ready as an active member. --> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org