Hello,

Has anyone run into a circumstance where gcc consistently generates bogus debug information - or, ddd can't read a source file correctly?

I am debugging some additions I made to the ACL framework. There are two problems I am having:

1. I overloaded the Acl::authorise method, and the method call is not dispatching to the correct instance. Previously there were two instances of Acl::authorise, and I overloaded each by creating a Framing::FieldTable & argument directly before the last argument. It appears to still be using the old methods even though the call I step out of clearly has the FieldTable argument.

2. In two source files, the code execution does not follow what ddd shows on-screen. ddd appears to be 3 lines behind what gdb indicates, and it appears to jump around randomly at times.

I've modified the Makefiles to remove optimization and hard-code -ggdb for CFLAGS and CXXFLAGS. The linker is not stripping debug info. What other things should I consider?

On the attached files - the call to acl->authorise on SessionAdapter.cpp line 339 ought to be calling the Acl::authorise on line 102 of Acl.cpp.

Any ideas?

Cheers,
-Josh

--

-----
http://www.globalherald.net/jb01
GlobalHerald.NET, the Smarter Social Network! (tm)
/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
#include "SessionAdapter.h"
#include "Connection.h"
#include "Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/agent/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>

namespace qpid {
namespace broker {

using namespace qpid;
using namespace qpid::framing;
using namespace qpid::framing::dtx;
using namespace qpid::management;
namespace _qmf = qmf::org::apache::qpid::broker;

typedef std::vector<Queue::shared_ptr> QueueVector;

SessionAdapter::SessionAdapter(SemanticState& s) :
    HandlerImpl(s),
    exchangeImpl(s),
    queueImpl(s),
    messageImpl(s),
    executionImpl(s),
    txImpl(s),
    dtxImpl(s)
{}

// JPK added the args call to acl->authorse below.

void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const 
string& type, 
                                                  const string& 
alternateExchange, 
                                                  bool passive, bool durable, 
bool /*autoDelete*/, const FieldTable& args){

    AclModule* acl = getBroker().getAcl();
    if (acl) {
        std::map<acl::Property, std::string> params;
        params.insert(make_pair(acl::PROP_TYPE, type));
        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" 
: "false") ));
        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" 
: "false")));
        if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,args,&params)
 )
            throw NotAllowedException(QPID_MSG("ACL denied exhange declare 
request from " << getConnection().getUserId()));
    }
    
    //TODO: implement autoDelete
    Exchange::shared_ptr alternate;
    if (!alternateExchange.empty()) {
        alternate = getBroker().getExchanges().get(alternateExchange);
    }
    if(passive){
        Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
        checkType(actual, type);
        checkAlternate(actual, alternate);
    }else{        
        try{
            std::pair<Exchange::shared_ptr, bool> response = 
getBroker().getExchanges().declare(exchange, type, durable, args);
            if (response.second) {
                if (durable) {
                    getBroker().getStore().create(*response.first, args);
                }
                if (alternate) {
                    response.first->setAlternate(alternate);
                    alternate->incAlternateUsers();
                }
            } else {
                checkType(response.first, type);
                checkAlternate(response.first, alternate);
            }

            ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
            if (agent)
                
agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), 
getConnection().getUserId(), exchange, type,
                                                             alternateExchange, 
durable, false, args,
                                                             response.second ? 
"created" : "existing"));

        }catch(UnknownExchangeTypeException& /*e*/){
            throw CommandInvalidException(QPID_MSG("Exchange type not 
implemented: " << type));
        }
    }
}

void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr 
exchange, const std::string& type)
{
    if (!type.empty() && exchange->getType() != type) {
        throw NotAllowedException(QPID_MSG("Exchange declared to be of type " 
<< exchange->getType() << ", requested " << type));
    }
}

void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr 
exchange, Exchange::shared_ptr alternate)
{
    if (alternate && alternate != exchange->getAlternate()) 
        throw NotAllowedException(QPID_MSG("Exchange declared with 
alternate-exchange "
                                           << 
exchange->getAlternate()->getName() << ", requested " 
                                           << alternate->getName()));
}
                
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool 
/*ifUnused*/)
{
    AclModule* acl = getBroker().getAcl();
    if (acl) {
        if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL)
 )
            throw NotAllowedException(QPID_MSG("ACL denied exhange delete 
request from " << getConnection().getUserId()));
    }

    //TODO: implement unused
    Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
    if (exchange->inUseAsAlternate()) throw 
NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
    if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
    getBroker().getExchanges().destroy(name);

    ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
    if (agent)
        agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), 
getConnection().getUserId(), name));
}

ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& 
name)
{
    AclModule* acl = getBroker().getAcl();
    if (acl) {
        if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL)
 )
            throw NotAllowedException(QPID_MSG("ACL denied exhange query 
request from " << getConnection().getUserId()));
    }

    try {
        Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
        return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), 
false, exchange->getArgs());
    } catch (const NotFoundException& /*e*/) {
        return ExchangeQueryResult("", false, true, FieldTable());        
    }
}

// JPK added 'arguments' to the acl->authorize call below.

void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, 
                                           const string& exchangeName, const 
string& routingKey, 
                                           const FieldTable& arguments)
{
    AclModule* acl = getBroker().getAcl();
    if (acl) {
        if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,arguments,routingKey)
 )
            throw NotAllowedException(QPID_MSG("ACL denied exhange bind request 
from " << getConnection().getUserId()));
    }

    Queue::shared_ptr queue = getQueue(queueName);
    Exchange::shared_ptr exchange = 
getBroker().getExchanges().get(exchangeName);
    if(exchange){
        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? 
queue->getName() : routingKey;
        if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
            queue->bound(exchangeName, routingKey, arguments);
            if (exchange->isDurable() && queue->isDurable()) {
                getBroker().getStore().bind(*exchange, *queue, routingKey, 
arguments);
            }

            ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
            if (agent)
                agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), 
getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, 
arguments));
        }
    }else{
        throw NotFoundException("Bind failed. No such exchange: " + 
exchangeName);
    }
}
 
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
                                                 const string& exchangeName,
                                                 const string& routingKey)
{
    AclModule* acl = getBroker().getAcl();
    if (acl) {
        std::map<acl::Property, std::string> params;
        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
        if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params)
 )
            throw NotAllowedException(QPID_MSG("ACL denied exchange unbind 
request from " << getConnection().getUserId()));
    }

    Queue::shared_ptr queue = getQueue(queueName);
    if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: 
" + exchangeName);

    Exchange::shared_ptr exchange = 
getBroker().getExchanges().get(exchangeName);
    if (!exchange.get()) throw NotFoundException("Unbind failed. No such 
exchange: " + exchangeName);

    //TODO: revise unbind to rely solely on binding key (not args)
    if (exchange->unbind(queue, routingKey, 0)) {
        if (exchange->isDurable() && queue->isDurable())
            getBroker().getStore().unbind(*exchange, *queue, routingKey, 
FieldTable());

        ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
        if (agent)
            agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), 
getConnection().getUserId(), exchangeName, queueName, routingKey));
    }
}

ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const 
std::string& exchangeName,
                                                                  const 
std::string& queueName,
                                                                  const 
std::string& key,
                                                                  const 
framing::FieldTable& args)
{
    AclModule* acl = getBroker().getAcl();
    if (acl) {
        std::map<acl::Property, std::string> params;
        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
        params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
        if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchangeName,&params)
 )
            throw NotAllowedException(QPID_MSG("ACL denied exhange bound 
request from " << getConnection().getUserId()));
    }
    
    Exchange::shared_ptr exchange;
    try {
        exchange = getBroker().getExchanges().get(exchangeName);
    } catch (const NotFoundException&) {}

    Queue::shared_ptr queue;
    if (!queueName.empty()) {
        queue = getBroker().getQueues().find(queueName);
    }

    if (!exchange) {
        return ExchangeBoundResult(true, false, false, false, false);
    } else if (!queueName.empty() && !queue) {
        return ExchangeBoundResult(false, true, false, false, false);
    } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 
0 ? &args : &args)) {
        return ExchangeBoundResult(false, false, false, false, false);
    } else {
        //need to test each specified option individually
        bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
        bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), 
&key, 0);
        bool argsMatched = args.count() == 0 || 
exchange->isBound(Queue::shared_ptr(), 0, &args);

        return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, 
!argsMatched);
    }
}

SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : 
HandlerHelper(session), broker(getBroker())
{}


SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
{
    try {
        destroyExclusiveQueues();
    } catch (std::exception& e) {
        QPID_LOG(error, e.what());
    }
}

void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
{
    while (!exclusiveQueues.empty()) {
        Queue::shared_ptr q(exclusiveQueues.front());
        q->releaseExclusiveOwnership();
        if (q->canAutoDelete()) {
            Queue::tryAutoDelete(broker, q);
        }
        exclusiveQueues.erase(exclusiveQueues.begin());
    }
}

    
bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const 
{ 
    return session.isLocal(t); 
}


QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
    AclModule* acl = getBroker().getAcl();
    if (acl) {
        if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL)
 )
            throw NotAllowedException(QPID_MSG("ACL denied queue query request 
from " << getConnection().getUserId()));
    }
    
    Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
    if (queue) {

        Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
        
        return QueueQueryResult(queue->getName(), 
                                alternateExchange ? 
alternateExchange->getName() : "", 
                                queue->isDurable(), 
                                queue->hasExclusiveOwner(),
                                queue->isAutoDelete(),
                                queue->getSettings(),
                                queue->getMessageCount(),
                                queue->getConsumerCount());
    } else {
        return QueueQueryResult();
    }
}

void SessionAdapter::QueueHandlerImpl::declare(const string& name, const 
string& alternateExchange,
                                               bool passive, bool durable, bool 
exclusive, 
                                               bool autoDelete, const 
qpid::framing::FieldTable& arguments)
{ 
    AclModule* acl = getBroker().getAcl();
    bool myVar;
    if (acl) {
        std::map<acl::Property, std::string> params;
        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" 
: "false") ));
        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" 
: "false")));
        params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? 
"true" : "false")));
        params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? 
"true" : "false")));
        // JPK - added arguments below.
        
        myVar = 
acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,arguments,&params);
        if (!myVar )
            throw NotAllowedException(QPID_MSG("ACL denied queue create request 
from " << getConnection().getUserId()));
    }

    Exchange::shared_ptr alternate;
    if (!alternateExchange.empty()) {
        alternate = getBroker().getExchanges().get(alternateExchange);
    }
    Queue::shared_ptr queue;
    if (passive && !name.empty()) {
    queue = getQueue(name);
        //TODO: check alternate-exchange is as expected
    } else {
        std::pair<Queue::shared_ptr, bool> queue_created =  
            getBroker().getQueues().declare(name, durable,
                                            autoDelete,
                                            exclusive ? &session : 0);
        queue = queue_created.first;
        assert(queue);
        if (queue_created.second) { // This is a new queue
            if (alternate) {
                queue->setAlternateExchange(alternate);
                alternate->incAlternateUsers();
            }

            //apply settings & create persistent record if required
            queue_created.first->create(arguments);

            //add default binding:
            getBroker().getExchanges().getDefault()->bind(queue, name, 0);
            queue->bound(getBroker().getExchanges().getDefault()->getName(), 
name, arguments);

            //if event generation is turned on, pass in a pointer to
            //the QueueEvents instance to use
            if (queue->getEventMode()) 
queue->setQueueEventManager(getBroker().getQueueEvents());

            //handle automatic cleanup:
            if (exclusive) {
                exclusiveQueues.push_back(queue);
            }
        } else {
            if (exclusive && queue->setExclusiveOwner(&session)) {
                exclusiveQueues.push_back(queue);
            }
        }

        ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
        if (agent)
            agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), 
getConnection().getUserId(),
                                                      name, durable, exclusive, 
autoDelete, arguments,
                                                      queue_created.second ? 
"created" : "existing"));
    }

    if (exclusive && !queue->isExclusiveOwner(&session)) 
        throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access 
to queue "
                                               << queue->getName()));
} 
        
        
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
    AclModule* acl = getBroker().getAcl();
    if (acl)
    {
         if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_PURGE,acl::OBJ_QUEUE,queue,NULL)
 )
             throw NotAllowedException(QPID_MSG("ACL denied queue purge request 
from " << getConnection().getUserId()));
    }
    getQueue(queue)->purge();
} 
        
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool 
ifUnused, bool ifEmpty){

    AclModule* acl = getBroker().getAcl();
    if (acl)
    {
         if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL)
 )
             throw NotAllowedException(QPID_MSG("ACL denied queue delete 
request from " << getConnection().getUserId()));
    }

    Queue::shared_ptr q = getQueue(queue);
    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) 
        throw ResourceLockedException(QPID_MSG("Cannot delete queue "
                                               << queue << "; it is exclusive 
to another session"));
    if(ifEmpty && q->getMessageCount() > 0){
        throw PreconditionFailedException("Queue not empty.");
    }else if(ifUnused && q->getConsumerCount() > 0){
        throw PreconditionFailedException("Queue in use.");
    }else{
        //remove the queue from the list of exclusive queues if necessary
        if(q->isExclusiveOwner(&getConnection())){
            QueueVector::iterator i = 
std::find(getConnection().exclusiveQueues.begin(), 
getConnection().exclusiveQueues.end(), q);
            if(i < getConnection().exclusiveQueues.end()) 
getConnection().exclusiveQueues.erase(i);
        }
        q->destroy();
        getBroker().getQueues().destroy(queue);
        q->unbind(getBroker().getExchanges(), q);

        ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
        if (agent)
            agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), 
getConnection().getUserId(), queue));
    }
} 


SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
    HandlerHelper(s),
    releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, 
true)),
    releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
    rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
    acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
 {}

//
// Message class method handlers
//

void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
                                  uint8_t /*acceptMode*/,
                                  uint8_t /*acquireMode*/)
{
    //not yet used (content containing assemblies treated differently at present
    std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << 
std::endl;
}

void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, 
bool setRedelivered)
{
    transfers.for_each(setRedelivered ? releaseRedeliveredOp : releaseOp);
}

void
SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
                                              const string& destination,
                                              uint8_t acceptMode,
                                              uint8_t acquireMode,
                                              bool exclusive,
                                              const string& resumeId,
                                              uint64_t resumeTtl,
                                              const FieldTable& arguments)
{

    AclModule* acl = getBroker().getAcl();
    if (acl)
    {
        // add flags as needed
         if 
(!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL)
 )
             throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe 
request from " << getConnection().getUserId()));
    }

    Queue::shared_ptr queue = getQueue(queueName);
    if(!destination.empty() && state.exists(destination))
        throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
    if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) 
        throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive 
queue "
                                               << queue->getName()));

    state.consume(destination, queue, 
                  acceptMode == 0, acquireMode == 0, exclusive, 
                  resumeId, resumeTtl, arguments);

    ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
    if (agent)
        agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), 
getConnection().getUserId(),
                                               queueName, destination, 
exclusive, arguments));
}

void
SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
{
    state.cancel(destination);

    ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
    if (agent)
        agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), 
getConnection().getUserId(), destination));
}

void
SessionAdapter::MessageHandlerImpl::reject(const SequenceSet& transfers, 
uint16_t /*code*/, const string& /*text*/ )
{
    transfers.for_each(rejectOp);
}

void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, 
uint8_t unit, uint32_t value)
{
    if (unit == 0) {
        //message
        state.addMessageCredit(destination, value);
    } else if (unit == 1) {
        //bytes
        state.addByteCredit(destination, value);
    } else {
        //unknown
        throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << 
unit));
    }
    
}
    
void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& 
destination, uint8_t mode)
{
    if (mode == 0) {
        //credit
        state.setCreditMode(destination);
    } else if (mode == 1) {
        //window
        state.setWindowMode(destination);
    } else{
        throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << 
mode));        
    }
}
    
void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
{
    state.flush(destination);        
}

void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
{
    state.stop(destination);        
}

void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& 
commands)
{

    commands.for_each(acceptOp);
}

framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const 
framing::SequenceSet& transfers)
{
    // FIXME aconway 2008-05-12: create SequenceSet directly, no need for 
intermediate results vector.
    SequenceNumberSet results;
    RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, 
boost::ref(results));
    transfers.for_each(f);

    results = results.condense();
    SequenceSet acquisitions;
    RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
    results.processRanges(g);

    return MessageAcquireResult(acquisitions);
}

framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const 
std::string& /*destination*/,
                                                                        const 
std::string& /*resumeId*/)
{
    throw NotImplementedException("resuming transfers not yet supported");
}
    


void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op

void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& 
/*commandId*/, const string& /*value*/)
{
    //TODO: but currently never used client->server
}

void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
                                                     const SequenceNumber& 
/*commandId*/,
                                                     uint8_t /*classCode*/,
                                                     uint8_t /*commandCode*/,
                                                     uint8_t /*fieldIndex*/,
                                                     const std::string& 
/*description*/,
                                                     const framing::FieldTable& 
/*errorInfo*/)
{
    //TODO: again, not really used client->server but may be important
    //for inter-broker links
}



void SessionAdapter::TxHandlerImpl::select()
{
    state.startTx();
}

void SessionAdapter::TxHandlerImpl::commit()
{
    state.commit(&getBroker().getStore());
}

void SessionAdapter::TxHandlerImpl::rollback()
{    
    state.rollback();
}

std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid)
{
    std::string encoded;
    encode(xid, encoded);
    return encoded;
}

void SessionAdapter::DtxHandlerImpl::select()
{
    state.selectDtx();
}

XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
                                                            bool fail,
                                                            bool suspend)
{
    try {
        if (fail) {
            state.endDtx(convert(xid), true);
            if (suspend) {
                throw CommandInvalidException(QPID_MSG("End and suspend cannot 
both be set."));
            } else {
                return XaResult(XA_STATUS_XA_RBROLLBACK);
            }
        } else {
            if (suspend) {
                state.suspendDtx(convert(xid));
            } else {
                state.endDtx(convert(xid), false);
            }
            return XaResult(XA_STATUS_XA_OK);
        }
    } catch (const DtxTimeoutException& /*e*/) {
        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
    }
}

XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
                                                                bool join,
                                                                bool resume)
{
    if (join && resume) {
        throw CommandInvalidException(QPID_MSG("Join and resume cannot both be 
set."));
    }
    try {
        if (resume) {
            state.resumeDtx(convert(xid));
        } else {
            state.startDtx(convert(xid), getBroker().getDtxManager(), join);
        }
        return XaResult(XA_STATUS_XA_OK);
    } catch (const DtxTimeoutException& /*e*/) {
        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
    }
}

XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
    try {
        bool ok = getBroker().getDtxManager().prepare(convert(xid));
        return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
    } catch (const DtxTimeoutException& /*e*/) {
        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
    }
}

XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
                            bool onePhase)
{
    try {
        bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
        return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
    } catch (const DtxTimeoutException& /*e*/) {
        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
    }
}


XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
    try {
        getBroker().getDtxManager().rollback(convert(xid));
        return XaResult(XA_STATUS_XA_OK);
    } catch (const DtxTimeoutException& /*e*/) {
        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
    }
}

DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
    std::set<std::string> xids;
    getBroker().getStore().collectPreparedXids(xids);        
    /*
     * create array of long structs
     */
    Array indoubt(0xAB);
    for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); 
i++) {
        boost::shared_ptr<FieldValue> xid(new Struct32Value(*i));
        indoubt.add(xid);
    }
    return DtxRecoverResult(indoubt);
}

void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
{
    //Currently no heuristic completion is supported, so this should never be 
used.
    throw NotImplementedException(QPID_MSG("Forget not implemented. Branch with 
xid "  << xid << " not heuristically completed!"));
}

DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
    uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
    return DtxGetTimeoutResult(timeout);    
}


void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
                                                uint32_t timeout)
{
    getBroker().getDtxManager().setTimeout(convert(xid), timeout);
}


Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) 
const {
    Queue::shared_ptr queue;
    if (name.empty()) {
        throw framing::IllegalArgumentException(QPID_MSG("No queue name 
specified."));
    } else {
        queue = session.getBroker().getQueues().find(name);
        if (!queue)
            throw framing::NotFoundException(QPID_MSG("Queue not found: 
"<<name));
    }
    return queue;
}

}} // namespace qpid::broker


/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include "qpid/acl/Acl.h"
#include "qpid/acl/AclData.h"

#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
#include "qpid/log/Logger.h"
#include "qmf/org/apache/qpid/acl/Package.h"
#include "qmf/org/apache/qpid/acl/EventAllow.h"
#include "qmf/org/apache/qpid/acl/EventDeny.h"
#include "qmf/org/apache/qpid/acl/EventFileLoaded.h"
#include "qmf/org/apache/qpid/acl/EventFileLoadFailed.h"

#include <map>

#include <boost/utility/in_place_factory.hpp>

using namespace std;
using namespace qpid::acl;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::acl;

Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), 
transferAcl(false)
{
           
    agent = ManagementAgent::Singleton::getInstance();

    if (agent != 0){
        _qmf::Package  packageInit(agent);
        mgmtObject = new _qmf::Acl (agent, this, broker);
        agent->addObject (mgmtObject);
    }

    std::string errorString;
    if (!readAclFile(errorString)){
        throw Exception("Could not read ACL file " + errorString);
        if (mgmtObject!=0) mgmtObject->set_enforcingAcl(0);
    }
    QPID_LOG(info, "ACL Plugin loaded");
           if (mgmtObject!=0) mgmtObject->set_enforcingAcl(1);
}

   bool Acl::authorise(const std::string& id, const Action& action, const 
ObjectType& objType, const std::string& name, std::map<Property, std::string>* 
params)
   {
      boost::shared_ptr<AclData> dataLocal = data;  //rcu copy

      // add real ACL check here...
      AclResult aclreslt = dataLocal->lookup(id,action,objType,name,params);
      
      // JPK:
      
      std::map<Property, std::string>::iterator iter;
      for( iter = params->begin(); iter != params->end(); ++iter )
      {
            cout << "property: " << iter->first << ", text: " << iter->second 
<< endl;
      }


          return result(aclreslt, id, action, objType, name);
   }

   bool Acl::authorise(const std::string& id, const Action& action, const 
ObjectType& objType, const std::string& ExchangeName, const std::string& 
RoutingKey)
   {
      boost::shared_ptr<AclData> dataLocal = data;  //rcu copy

      // only use dataLocal here...
      AclResult aclreslt = 
dataLocal->lookup(id,action,objType,ExchangeName,RoutingKey);

      // JPK: update
      
      cout << "You are here!";



          return result(aclreslt, id, action, objType, ExchangeName);
   }

// JPK - adding these modified authorise's:

   bool Acl::authorise(const std::string& id, const Action& action, const 
ObjectType& objType, const std::string& name, const framing::FieldTable 
&arguments, std::map<Property, std::string>* params)
   {
      boost::shared_ptr<AclData> dataLocal = data;  //rcu copy
      framing::FieldTable myArgs = arguments;

      // add real ACL check here...
      AclResult aclreslt = dataLocal->lookup(id,action,objType,name,params);
      
      // JPK:
      
      std::map<Property, std::string>::iterator iter;
      /*for( iter = arguments->begin(); iter != arguments->end(); ++iter )
      {
            cout << "argument: " << iter->first << ", text: " << iter->second 
<< endl;
      }*/


          return result(aclreslt, id, action, objType, name);
   }

   bool Acl::authorise(const std::string& id, const Action& action, const 
ObjectType& objType, const std::string& ExchangeName, const framing::FieldTable 
&arguments, const std::string& RoutingKey)
   {
      boost::shared_ptr<AclData> dataLocal = data;  //rcu copy
      framing::FieldTable myArgs = arguments;

      // only use dataLocal here...
      AclResult aclreslt = 
dataLocal->lookup(id,action,objType,ExchangeName,RoutingKey);

      // JPK: update
      
      cout << "You are here!";

      std::map<Property, std::string>::iterator iter;
      /*for( iter = arguments->begin(); iter != arguments->end(); ++iter )
      {
            cout << "argument: " << iter->first << ", text: " << iter->second 
<< endl;
      }*/


          return result(aclreslt, id, action, objType, ExchangeName);
   }



// JPK - end



   bool Acl::result(const AclResult& aclreslt, const std::string& id, const 
Action& action, const ObjectType& objType, const std::string& name)
   {
          switch (aclreslt)
          {
          case ALLOWLOG:
          QPID_LOG(info, "ACL Allow id:" << id <<" action:" << 
AclHelper::getActionStr(action) <<
                   " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " 
Name:" << name );
          agent->raiseEvent(_qmf::EventAllow(id,  
AclHelper::getActionStr(action),
                                             
AclHelper::getObjectTypeStr(objType),
                                             name, framing::FieldTable()));
          case ALLOW:
              return true;
          case DENY:
              if (mgmtObject!=0) mgmtObject->inc_aclDenyCount();
              return false;
          case DENYLOG:
              if (mgmtObject!=0) mgmtObject->inc_aclDenyCount();
      default:
          QPID_LOG(info, "ACL Deny id:" << id << " action:" << 
AclHelper::getActionStr(action) << " ObjectType:" << 
AclHelper::getObjectTypeStr(objType) << " Name:" << name);
          agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action),
                                            
AclHelper::getObjectTypeStr(objType),
                                            name, framing::FieldTable()));
          return false;
          }
      return false;
   }

   bool Acl::readAclFile(std::string& errorText)
   {
      // only set transferAcl = true if a rule implies the use of ACL on 
transfer, else keep false for permormance reasons.
      return readAclFile(aclValues.aclFile, errorText);
   }

   bool Acl::readAclFile(std::string& aclFile, std::string& errorText) {
      boost::shared_ptr<AclData> d(new AclData);
      AclReader ar;
      if (ar.read(aclFile, d)){
          agent->raiseEvent(_qmf::EventFileLoadFailed("", ar.getError()));
          errorText = ar.getError();
          QPID_LOG(error,ar.getError());
          return false;
      }

      data = d;
          transferAcl = data->transferAcl; // any transfer ACL
          if (mgmtObject!=0){
              mgmtObject->set_transferAcl(transferAcl?1:0);
                  mgmtObject->set_policyFile(aclFile);
                  sys::AbsTime now = sys::AbsTime::now();
          int64_t ns = sys::Duration(now);
                  mgmtObject->set_lastAclLoad(ns);
          agent->raiseEvent(_qmf::EventFileLoaded(""));
          }
      return true;
   }

   Acl::~Acl(){}

   ManagementObject* Acl::GetManagementObject(void) const
   {
       return (ManagementObject*) mgmtObject;
   }

   Manageable::status_t Acl::ManagementMethod (uint32_t methodId, Args& 
/*args*/, string& text)
   {
      Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
      QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");

      switch (methodId)
      {
      case _qmf::Acl::METHOD_RELOADACLFILE :
          readAclFile(text);
          status = Manageable::STATUS_USER;
          break;
      }

    return status;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to