Hello,
Has anyone run into a circumstance where gcc consistently generates bogus debug
information - or, ddd can't read a source file correctly?
I am debugging some additions I made to the ACL framework. There are two
problems I am having:
1. I overloaded the Acl::authorise method, and the method call is not
dispatching to the correct instance. Previously there were two instances
of Acl::authorise, and I overloaded each by creating a Framing::FieldTable
& argument directly before the last argument. It appears to still be
using the old methods even though the call I step out of clearly has the
FieldTable argument.
2. In two source files, the code execution does not follow what ddd shows
on-screen. ddd appears to be 3 lines behind what gdb indicates, and it
appears to jump around randomly at times.
I've modified the Makefiles to remove optimization and hard-code -ggdb for
CFLAGS and CXXFLAGS. The linker is not stripping debug info. What
other things should I consider?
On the attached files - the call to acl->authorise on SessionAdapter.cpp line
339 ought to be calling the Acl::authorise on line 102 of Acl.cpp.
Any ideas?
Cheers,
-Josh
--
-----
http://www.globalherald.net/jb01
GlobalHerald.NET, the Smarter Social Network! (tm)
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "SessionAdapter.h"
#include "Connection.h"
#include "Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/agent/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
namespace qpid {
namespace broker {
using namespace qpid;
using namespace qpid::framing;
using namespace qpid::framing::dtx;
using namespace qpid::management;
namespace _qmf = qmf::org::apache::qpid::broker;
typedef std::vector<Queue::shared_ptr> QueueVector;
SessionAdapter::SessionAdapter(SemanticState& s) :
HandlerImpl(s),
exchangeImpl(s),
queueImpl(s),
messageImpl(s),
executionImpl(s),
txImpl(s),
dtxImpl(s)
{}
// JPK added the args call to acl->authorse below.
void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
string& type,
const string&
alternateExchange,
bool passive, bool durable,
bool /*autoDelete*/, const FieldTable& args){
AclModule* acl = getBroker().getAcl();
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_TYPE, type));
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true"
: "false") ));
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true"
: "false")));
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,args,¶ms)
)
throw NotAllowedException(QPID_MSG("ACL denied exhange declare
request from " << getConnection().getUserId()));
}
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = getBroker().getExchanges().get(alternateExchange);
}
if(passive){
Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
}else{
try{
std::pair<Exchange::shared_ptr, bool> response =
getBroker().getExchanges().declare(exchange, type, durable, args);
if (response.second) {
if (durable) {
getBroker().getStore().create(*response.first, args);
}
if (alternate) {
response.first->setAlternate(alternate);
alternate->incAlternateUsers();
}
} else {
checkType(response.first, type);
checkAlternate(response.first, alternate);
}
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
getConnection().getUserId(), exchange, type,
alternateExchange,
durable, false, args,
response.second ?
"created" : "existing"));
}catch(UnknownExchangeTypeException& /*e*/){
throw CommandInvalidException(QPID_MSG("Exchange type not
implemented: " << type));
}
}
}
void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr
exchange, const std::string& type)
{
if (!type.empty() && exchange->getType() != type) {
throw NotAllowedException(QPID_MSG("Exchange declared to be of type "
<< exchange->getType() << ", requested " << type));
}
}
void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr
exchange, Exchange::shared_ptr alternate)
{
if (alternate && alternate != exchange->getAlternate())
throw NotAllowedException(QPID_MSG("Exchange declared with
alternate-exchange "
<<
exchange->getAlternate()->getName() << ", requested "
<< alternate->getName()));
}
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool
/*ifUnused*/)
{
AclModule* acl = getBroker().getAcl();
if (acl) {
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL)
)
throw NotAllowedException(QPID_MSG("ACL denied exhange delete
request from " << getConnection().getUserId()));
}
//TODO: implement unused
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
if (exchange->inUseAsAlternate()) throw
NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
getBroker().getExchanges().destroy(name);
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(),
getConnection().getUserId(), name));
}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string&
name)
{
AclModule* acl = getBroker().getAcl();
if (acl) {
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL)
)
throw NotAllowedException(QPID_MSG("ACL denied exhange query
request from " << getConnection().getUserId()));
}
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(),
false, exchange->getArgs());
} catch (const NotFoundException& /*e*/) {
return ExchangeQueryResult("", false, true, FieldTable());
}
}
// JPK added 'arguments' to the acl->authorize call below.
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
const string& exchangeName, const
string& routingKey,
const FieldTable& arguments)
{
AclModule* acl = getBroker().getAcl();
if (acl) {
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,arguments,routingKey)
)
throw NotAllowedException(QPID_MSG("ACL denied exhange bind request
from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
Exchange::shared_ptr exchange =
getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ?
queue->getName() : routingKey;
if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
queue->bound(exchangeName, routingKey, arguments);
if (exchange->isDurable() && queue->isDurable()) {
getBroker().getStore().bind(*exchange, *queue, routingKey,
arguments);
}
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(),
getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey,
arguments));
}
}else{
throw NotFoundException("Bind failed. No such exchange: " +
exchangeName);
}
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
AclModule* acl = getBroker().getAcl();
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)
)
throw NotAllowedException(QPID_MSG("ACL denied exchange unbind
request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange:
" + exchangeName);
Exchange::shared_ptr exchange =
getBroker().getExchanges().get(exchangeName);
if (!exchange.get()) throw NotFoundException("Unbind failed. No such
exchange: " + exchangeName);
//TODO: revise unbind to rely solely on binding key (not args)
if (exchange->unbind(queue, routingKey, 0)) {
if (exchange->isDurable() && queue->isDurable())
getBroker().getStore().unbind(*exchange, *queue, routingKey,
FieldTable());
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(),
getConnection().getUserId(), exchangeName, queueName, routingKey));
}
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const
std::string& exchangeName,
const
std::string& queueName,
const
std::string& key,
const
framing::FieldTable& args)
{
AclModule* acl = getBroker().getAcl();
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchangeName,¶ms)
)
throw NotAllowedException(QPID_MSG("ACL denied exhange bound
request from " << getConnection().getUserId()));
}
Exchange::shared_ptr exchange;
try {
exchange = getBroker().getExchanges().get(exchangeName);
} catch (const NotFoundException&) {}
Queue::shared_ptr queue;
if (!queueName.empty()) {
queue = getBroker().getQueues().find(queueName);
}
if (!exchange) {
return ExchangeBoundResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
return ExchangeBoundResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() >
0 ? &args : &args)) {
return ExchangeBoundResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(),
&key, 0);
bool argsMatched = args.count() == 0 ||
exchange->isBound(Queue::shared_ptr(), 0, &args);
return ExchangeBoundResult(false, false, !queueMatched, !keyMatched,
!argsMatched);
}
}
SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) :
HandlerHelper(session), broker(getBroker())
{}
SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
{
try {
destroyExclusiveQueues();
} catch (std::exception& e) {
QPID_LOG(error, e.what());
}
}
void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
{
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
if (q->canAutoDelete()) {
Queue::tryAutoDelete(broker, q);
}
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
{
return session.isLocal(t);
}
QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
AclModule* acl = getBroker().getAcl();
if (acl) {
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL)
)
throw NotAllowedException(QPID_MSG("ACL denied queue query request
from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
if (queue) {
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
alternateExchange ?
alternateExchange->getName() : "",
queue->isDurable(),
queue->hasExclusiveOwner(),
queue->isAutoDelete(),
queue->getSettings(),
queue->getMessageCount(),
queue->getConsumerCount());
} else {
return QueueQueryResult();
}
}
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const
string& alternateExchange,
bool passive, bool durable, bool
exclusive,
bool autoDelete, const
qpid::framing::FieldTable& arguments)
{
AclModule* acl = getBroker().getAcl();
bool myVar;
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true"
: "false") ));
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true"
: "false")));
params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ?
"true" : "false")));
params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ?
"true" : "false")));
// JPK - added arguments below.
myVar =
acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,arguments,¶ms);
if (!myVar )
throw NotAllowedException(QPID_MSG("ACL denied queue create request
from " << getConnection().getUserId()));
}
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = getBroker().getExchanges().get(alternateExchange);
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().getQueues().declare(name, durable,
autoDelete,
exclusive ? &session : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
}
//apply settings & create persistent record if required
queue_created.first->create(arguments);
//add default binding:
getBroker().getExchanges().getDefault()->bind(queue, name, 0);
queue->bound(getBroker().getExchanges().getDefault()->getName(),
name, arguments);
//if event generation is turned on, pass in a pointer to
//the QueueEvents instance to use
if (queue->getEventMode())
queue->setQueueEventManager(getBroker().getQueueEvents());
//handle automatic cleanup:
if (exclusive) {
exclusiveQueues.push_back(queue);
}
} else {
if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
}
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(),
getConnection().getUserId(),
name, durable, exclusive,
autoDelete, arguments,
queue_created.second ?
"created" : "existing"));
}
if (exclusive && !queue->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access
to queue "
<< queue->getName()));
}
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
AclModule* acl = getBroker().getAcl();
if (acl)
{
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_PURGE,acl::OBJ_QUEUE,queue,NULL)
)
throw NotAllowedException(QPID_MSG("ACL denied queue purge request
from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
}
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool
ifUnused, bool ifEmpty){
AclModule* acl = getBroker().getAcl();
if (acl)
{
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL)
)
throw NotAllowedException(QPID_MSG("ACL denied queue delete
request from " << getConnection().getUserId()));
}
Queue::shared_ptr q = getQueue(queue);
if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot delete queue "
<< queue << "; it is exclusive
to another session"));
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
throw PreconditionFailedException("Queue in use.");
}else{
//remove the queue from the list of exclusive queues if necessary
if(q->isExclusiveOwner(&getConnection())){
QueueVector::iterator i =
std::find(getConnection().exclusiveQueues.begin(),
getConnection().exclusiveQueues.end(), q);
if(i < getConnection().exclusiveQueues.end())
getConnection().exclusiveQueues.erase(i);
}
q->destroy();
getBroker().getQueues().destroy(queue);
q->unbind(getBroker().getExchanges(), q);
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(),
getConnection().getUserId(), queue));
}
}
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2,
true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
{}
//
// Message class method handlers
//
void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
uint8_t /*acceptMode*/,
uint8_t /*acquireMode*/)
{
//not yet used (content containing assemblies treated differently at present
std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" <<
std::endl;
}
void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers,
bool setRedelivered)
{
transfers.for_each(setRedelivered ? releaseRedeliveredOp : releaseOp);
}
void
SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
const string& destination,
uint8_t acceptMode,
uint8_t acquireMode,
bool exclusive,
const string& resumeId,
uint64_t resumeTtl,
const FieldTable& arguments)
{
AclModule* acl = getBroker().getAcl();
if (acl)
{
// add flags as needed
if
(!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL)
)
throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe
request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && state.exists(destination))
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive
queue "
<< queue->getName()));
state.consume(destination, queue,
acceptMode == 0, acquireMode == 0, exclusive,
resumeId, resumeTtl, arguments);
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(),
getConnection().getUserId(),
queueName, destination,
exclusive, arguments));
}
void
SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
{
state.cancel(destination);
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(),
getConnection().getUserId(), destination));
}
void
SessionAdapter::MessageHandlerImpl::reject(const SequenceSet& transfers,
uint16_t /*code*/, const string& /*text*/ )
{
transfers.for_each(rejectOp);
}
void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination,
uint8_t unit, uint32_t value)
{
if (unit == 0) {
//message
state.addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
state.addByteCredit(destination, value);
} else {
//unknown
throw InvalidArgumentException(QPID_MSG("Invalid value for unit " <<
unit));
}
}
void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string&
destination, uint8_t mode)
{
if (mode == 0) {
//credit
state.setCreditMode(destination);
} else if (mode == 1) {
//window
state.setWindowMode(destination);
} else{
throw InvalidArgumentException(QPID_MSG("Invalid value for mode " <<
mode));
}
}
void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
{
state.flush(destination);
}
void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
{
state.stop(destination);
}
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet&
commands)
{
commands.for_each(acceptOp);
}
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const
framing::SequenceSet& transfers)
{
// FIXME aconway 2008-05-12: create SequenceSet directly, no need for
intermediate results vector.
SequenceNumberSet results;
RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2,
boost::ref(results));
transfers.for_each(f);
results = results.condense();
SequenceSet acquisitions;
RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
results.processRanges(g);
return MessageAcquireResult(acquisitions);
}
framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const
std::string& /*destination*/,
const
std::string& /*resumeId*/)
{
throw NotImplementedException("resuming transfers not yet supported");
}
void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber&
/*commandId*/, const string& /*value*/)
{
//TODO: but currently never used client->server
}
void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
const SequenceNumber&
/*commandId*/,
uint8_t /*classCode*/,
uint8_t /*commandCode*/,
uint8_t /*fieldIndex*/,
const std::string&
/*description*/,
const framing::FieldTable&
/*errorInfo*/)
{
//TODO: again, not really used client->server but may be important
//for inter-broker links
}
void SessionAdapter::TxHandlerImpl::select()
{
state.startTx();
}
void SessionAdapter::TxHandlerImpl::commit()
{
state.commit(&getBroker().getStore());
}
void SessionAdapter::TxHandlerImpl::rollback()
{
state.rollback();
}
std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid)
{
std::string encoded;
encode(xid, encoded);
return encoded;
}
void SessionAdapter::DtxHandlerImpl::select()
{
state.selectDtx();
}
XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
bool fail,
bool suspend)
{
try {
if (fail) {
state.endDtx(convert(xid), true);
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot
both be set."));
} else {
return XaResult(XA_STATUS_XA_RBROLLBACK);
}
} else {
if (suspend) {
state.suspendDtx(convert(xid));
} else {
state.endDtx(convert(xid), false);
}
return XaResult(XA_STATUS_XA_OK);
}
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
bool join,
bool resume)
{
if (join && resume) {
throw CommandInvalidException(QPID_MSG("Join and resume cannot both be
set."));
}
try {
if (resume) {
state.resumeDtx(convert(xid));
} else {
state.startDtx(convert(xid), getBroker().getDtxManager(), join);
}
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(convert(xid));
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
getBroker().getDtxManager().rollback(convert(xid));
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
getBroker().getStore().collectPreparedXids(xids);
/*
* create array of long structs
*/
Array indoubt(0xAB);
for (std::set<std::string>::iterator i = xids.begin(); i != xids.end();
i++) {
boost::shared_ptr<FieldValue> xid(new Struct32Value(*i));
indoubt.add(xid);
}
return DtxRecoverResult(indoubt);
}
void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
{
//Currently no heuristic completion is supported, so this should never be
used.
throw NotImplementedException(QPID_MSG("Forget not implemented. Branch with
xid " << xid << " not heuristically completed!"));
}
DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
return DtxGetTimeoutResult(timeout);
}
void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
uint32_t timeout)
{
getBroker().getDtxManager().setTimeout(convert(xid), timeout);
}
Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name)
const {
Queue::shared_ptr queue;
if (name.empty()) {
throw framing::IllegalArgumentException(QPID_MSG("No queue name
specified."));
} else {
queue = session.getBroker().getQueues().find(name);
if (!queue)
throw framing::NotFoundException(QPID_MSG("Queue not found:
"<<name));
}
return queue;
}
}} // namespace qpid::broker
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "qpid/acl/Acl.h"
#include "qpid/acl/AclData.h"
#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
#include "qpid/log/Logger.h"
#include "qmf/org/apache/qpid/acl/Package.h"
#include "qmf/org/apache/qpid/acl/EventAllow.h"
#include "qmf/org/apache/qpid/acl/EventDeny.h"
#include "qmf/org/apache/qpid/acl/EventFileLoaded.h"
#include "qmf/org/apache/qpid/acl/EventFileLoadFailed.h"
#include <map>
#include <boost/utility/in_place_factory.hpp>
using namespace std;
using namespace qpid::acl;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::acl;
Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b),
transferAcl(false)
{
agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
_qmf::Package packageInit(agent);
mgmtObject = new _qmf::Acl (agent, this, broker);
agent->addObject (mgmtObject);
}
std::string errorString;
if (!readAclFile(errorString)){
throw Exception("Could not read ACL file " + errorString);
if (mgmtObject!=0) mgmtObject->set_enforcingAcl(0);
}
QPID_LOG(info, "ACL Plugin loaded");
if (mgmtObject!=0) mgmtObject->set_enforcingAcl(1);
}
bool Acl::authorise(const std::string& id, const Action& action, const
ObjectType& objType, const std::string& name, std::map<Property, std::string>*
params)
{
boost::shared_ptr<AclData> dataLocal = data; //rcu copy
// add real ACL check here...
AclResult aclreslt = dataLocal->lookup(id,action,objType,name,params);
// JPK:
std::map<Property, std::string>::iterator iter;
for( iter = params->begin(); iter != params->end(); ++iter )
{
cout << "property: " << iter->first << ", text: " << iter->second
<< endl;
}
return result(aclreslt, id, action, objType, name);
}
bool Acl::authorise(const std::string& id, const Action& action, const
ObjectType& objType, const std::string& ExchangeName, const std::string&
RoutingKey)
{
boost::shared_ptr<AclData> dataLocal = data; //rcu copy
// only use dataLocal here...
AclResult aclreslt =
dataLocal->lookup(id,action,objType,ExchangeName,RoutingKey);
// JPK: update
cout << "You are here!";
return result(aclreslt, id, action, objType, ExchangeName);
}
// JPK - adding these modified authorise's:
bool Acl::authorise(const std::string& id, const Action& action, const
ObjectType& objType, const std::string& name, const framing::FieldTable
&arguments, std::map<Property, std::string>* params)
{
boost::shared_ptr<AclData> dataLocal = data; //rcu copy
framing::FieldTable myArgs = arguments;
// add real ACL check here...
AclResult aclreslt = dataLocal->lookup(id,action,objType,name,params);
// JPK:
std::map<Property, std::string>::iterator iter;
/*for( iter = arguments->begin(); iter != arguments->end(); ++iter )
{
cout << "argument: " << iter->first << ", text: " << iter->second
<< endl;
}*/
return result(aclreslt, id, action, objType, name);
}
bool Acl::authorise(const std::string& id, const Action& action, const
ObjectType& objType, const std::string& ExchangeName, const framing::FieldTable
&arguments, const std::string& RoutingKey)
{
boost::shared_ptr<AclData> dataLocal = data; //rcu copy
framing::FieldTable myArgs = arguments;
// only use dataLocal here...
AclResult aclreslt =
dataLocal->lookup(id,action,objType,ExchangeName,RoutingKey);
// JPK: update
cout << "You are here!";
std::map<Property, std::string>::iterator iter;
/*for( iter = arguments->begin(); iter != arguments->end(); ++iter )
{
cout << "argument: " << iter->first << ", text: " << iter->second
<< endl;
}*/
return result(aclreslt, id, action, objType, ExchangeName);
}
// JPK - end
bool Acl::result(const AclResult& aclreslt, const std::string& id, const
Action& action, const ObjectType& objType, const std::string& name)
{
switch (aclreslt)
{
case ALLOWLOG:
QPID_LOG(info, "ACL Allow id:" << id <<" action:" <<
AclHelper::getActionStr(action) <<
" ObjectType:" << AclHelper::getObjectTypeStr(objType) << "
Name:" << name );
agent->raiseEvent(_qmf::EventAllow(id,
AclHelper::getActionStr(action),
AclHelper::getObjectTypeStr(objType),
name, framing::FieldTable()));
case ALLOW:
return true;
case DENY:
if (mgmtObject!=0) mgmtObject->inc_aclDenyCount();
return false;
case DENYLOG:
if (mgmtObject!=0) mgmtObject->inc_aclDenyCount();
default:
QPID_LOG(info, "ACL Deny id:" << id << " action:" <<
AclHelper::getActionStr(action) << " ObjectType:" <<
AclHelper::getObjectTypeStr(objType) << " Name:" << name);
agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action),
AclHelper::getObjectTypeStr(objType),
name, framing::FieldTable()));
return false;
}
return false;
}
bool Acl::readAclFile(std::string& errorText)
{
// only set transferAcl = true if a rule implies the use of ACL on
transfer, else keep false for permormance reasons.
return readAclFile(aclValues.aclFile, errorText);
}
bool Acl::readAclFile(std::string& aclFile, std::string& errorText) {
boost::shared_ptr<AclData> d(new AclData);
AclReader ar;
if (ar.read(aclFile, d)){
agent->raiseEvent(_qmf::EventFileLoadFailed("", ar.getError()));
errorText = ar.getError();
QPID_LOG(error,ar.getError());
return false;
}
data = d;
transferAcl = data->transferAcl; // any transfer ACL
if (mgmtObject!=0){
mgmtObject->set_transferAcl(transferAcl?1:0);
mgmtObject->set_policyFile(aclFile);
sys::AbsTime now = sys::AbsTime::now();
int64_t ns = sys::Duration(now);
mgmtObject->set_lastAclLoad(ns);
agent->raiseEvent(_qmf::EventFileLoaded(""));
}
return true;
}
Acl::~Acl(){}
ManagementObject* Acl::GetManagementObject(void) const
{
return (ManagementObject*) mgmtObject;
}
Manageable::status_t Acl::ManagementMethod (uint32_t methodId, Args&
/*args*/, string& text)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
switch (methodId)
{
case _qmf::Acl::METHOD_RELOADACLFILE :
readAclFile(text);
status = Manageable::STATUS_USER;
break;
}
return status;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]