Hello,

We have been struggling mightily to get qpid to compile on aix 6.1 with gcc.  
We have tried both 4.8.2 and 4.8.3 and the error we have run into is:

/home/dbapi/enquesta_5_0/qpid-cpp-0.30/src/qpid/sys/posix/PosixPoller.cpp:796:2:
 internal compiler error: in function_and_variable_visibility, at ipa.c:868
}}
  ^
libbacktrace could not find executable to open
Please submit a full bug report,
with preprocessed source if appropriate.
See <http://gcc.gnu.org/bugs.html> for instructions.
make: 1254-004 The error code from the last command is 1.

This appears to be a problem with gcc itself rather than qpid code, but since 
we are utilizing the latest version of the gcc compiler I could find for aix 
and it still does not work, any help in providing a workaround would be greatly 
appreciated.  My coworker was able to identify the problematic lines.  Two of 
the errors appear to be related to the use of the PollerHandleDeletionManager 
(Line 701 and 712) and  Event (Line 729)

PollerHandleDeletionManager.destroyThreadState();  (2 occurrences)
Event event = impl->eventStream.next(timeout);  (1 occurrence)

When he commented out those lines (and additional affected code in the case of 
the Event line), the compilation of PosixPoller.cpp succeeded.  We surmised 
that it has something to do with gcc handling of namespaces, but could not come 
up with a way to get around this issue.  We are planning on trying IBM's xlc 
compiler, but we have invested a lot of time in this already and would like to 
avoid xlc licensing costs if possible.

Thanks in advance,

Regards,

Chris Whelan

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "qpid/sys/Poller.h"
#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicCount.h"
#include "qpid/sys/DeletionManager.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Condition.h"

#include <poll.h>
#include <errno.h>
#include <signal.h>

#include <assert.h>
#include <queue>
#include <set>
#include <exception>

/*
 *
 * This is a qpid::sys::Poller implementation for Posix systems.
 *
 * This module follows the structure of the Linux EpollPoller as closely as 
possible
 * to simplify maintainability.  Noteworthy differences:
 *
 * The Linux epoll_xxx() calls present one event at a time to multiple callers 
whereas poll()
 * returns one or more events to a single caller.  The EventStream class layers 
a
 * "one event per call" view of the poll() result to multiple threads.
 *
 * The HandleSet is the master set of in-use PollerHandles.  The EventStream
 * maintains a snapshot copy taken just before the call to poll() that remains 
static
 * until all flagged events have been processed.
 *
 * There is an additional window where the PollerHandlePrivate class may 
survive the
 * parent PollerHandle destructor, i.e. between snapshots.
 *
 * Safe interrupting of the Poller is implemented using the "self-pipe trick".
 *
 */

namespace qpid {
namespace sys {

// Deletion manager to handle deferring deletion of PollerHandles to when they 
definitely aren't being used
DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;

//  Instantiate (and define) class static for DeletionManager
template <>
DeletionManager<PollerHandlePrivate>::AllThreadsStatuses 
DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);

class PollerHandlePrivate {
    friend class Poller;
    friend class PollerPrivate;
    friend class PollerHandle;
    friend class HandleSet;

    enum FDStat {
        ABSENT,
        MONITORED,
        INACTIVE,
        HUNGUP,
        MONITORED_HUNGUP,
        INTERRUPTED,
        INTERRUPTED_HUNGUP,
        DELETED
    };

    short events;
    const IOHandle* ioHandle;
    PollerHandle* pollerHandle;
    FDStat stat;
    Mutex lock;

    PollerHandlePrivate(const IOHandle* h, PollerHandle* p) :
      events(0),
      ioHandle(h),
      pollerHandle(p),
      stat(ABSENT) {
    }

    int fd() const {
        return ioHandle->fd;
    }

    bool isActive() const {
        return stat == MONITORED || stat == MONITORED_HUNGUP;
    }

    void setActive() {
        stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP)
            ? MONITORED_HUNGUP
            : MONITORED;
    }

    bool isInactive() const {
        return stat == INACTIVE || stat == HUNGUP;
    }

    void setInactive() {
        stat = INACTIVE;
    }

    bool isIdle() const {
        return stat == ABSENT;
    }

    void setIdle() {
        stat = ABSENT;
    }

    bool isHungup() const {
        return
            stat == MONITORED_HUNGUP ||
            stat == HUNGUP ||
            stat == INTERRUPTED_HUNGUP;
    }

    void setHungup() {
        assert(stat == MONITORED);
        stat = HUNGUP;
    }

    bool isInterrupted() const {
        return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
    }

    void setInterrupted() {
        stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
            ? INTERRUPTED_HUNGUP
            : INTERRUPTED;
    }

    bool isDeleted() const {
        return stat == DELETED;
    }

    void setDeleted() {
        stat = DELETED;
    }
};

PollerHandle::PollerHandle(const IOHandle& h) :
    impl(new PollerHandlePrivate(&h, this))
{}

PollerHandle::~PollerHandle() {
    {
    ScopedLock<Mutex> l(impl->lock);
    if (impl->isDeleted()) {
        return;
    }
    impl->pollerHandle = 0;
    if (impl->isInterrupted()) {
        impl->setDeleted();
        return;
    }
    assert(impl->isIdle());
    impl->setDeleted();
    }
    PollerHandleDeletionManager.markForDeletion(impl);
}

class HandleSet
{
    Mutex lock;
    bool stale;
    std::set<PollerHandlePrivate*> handles;
  public:
    HandleSet() : stale(true) {}
    void add(PollerHandlePrivate*);
    void remove(PollerHandlePrivate*);
    void cleanup();
    bool snapshot(std::vector<PollerHandlePrivate *>& , std::vector<struct 
::pollfd>&);
    void setStale();
};

void HandleSet::add(PollerHandlePrivate* h)
{
    ScopedLock<Mutex> l(lock);
    handles.insert(h);
}
void HandleSet::remove(PollerHandlePrivate* h)
{
    ScopedLock<Mutex> l(lock);
    handles.erase(h);
}
void HandleSet::cleanup()
{
    // Inform all registered handles of disconnection
    std::set<PollerHandlePrivate*> copy;
    handles.swap(copy);
    for (std::set<PollerHandlePrivate*>::const_iterator i = copy.begin(); i != 
copy.end(); ++i) {
        PollerHandlePrivate& eh = **i;
        {
            ScopedLock<Mutex> l(eh.lock);
            if (!eh.isDeleted()) {
                Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED);
                event.process();
            }
        }
    }
}
void HandleSet::setStale()
{
    // invalidate cached pollfds for next snapshot
    ScopedLock<Mutex> l(lock);
    stale = true;
}

/**
 * Concrete implementation of Poller to use Posix poll()
 * interface
 */
class PollerPrivate {
    friend class Poller;
    friend class EventStream;
    friend class HandleSet;

    class SignalPipe {
        /**
         * Used to wakeup a thread in ::poll()
         */
        int fds[2];
        bool signaled;
        bool permanent;
        Mutex lock;
    public:
        SignalPipe() : signaled(false), permanent(false) {
            QPID_POSIX_CHECK(::pipe(fds));
        }

        ~SignalPipe() {
            ::close(fds[0]);
            ::close(fds[1]);
        }

        int getFD() {
            return fds[0];
        }

        bool isSet() {
            return signaled;
        }

        void set() {
            ScopedLock<Mutex> l(lock);
            if (signaled)
                return;
            signaled = true;
            QPID_POSIX_CHECK(::write(fds[1], " ", 1));
        }

        void reset() {
            if (permanent)
                return;
            ScopedLock<Mutex> l(lock);
            if (signaled) {
                char ignore;
                QPID_POSIX_CHECK(::read(fds[0], &ignore, 1));
                signaled = false;
            }
        }

        void setPermanently() {
            // async signal safe calls only.  No locking.
            permanent = true;
            signaled = true;
            QPID_POSIX_CHECK(::write(fds[1], "  ", 2));
            // poll() should never block now
        }
    };

    // Collect pending events and serialize access.  Maintain array of pollfd 
structs.
    class EventStream {
        typedef Poller::Event Event;
        PollerPrivate& pollerPrivate;
        SignalPipe& signalPipe;
        std::queue<PollerHandlePrivate*> interruptedHandles;
        std::vector<struct ::pollfd> pollfds;
        std::vector<PollerHandlePrivate*> pollHandles;
        Mutex streamLock;
        Mutex serializeLock;
        Condition serializer;
        bool busy;
        int currentPollfd;
        int pollCount;
        int waiters;

    public:

        EventStream(PollerPrivate* p) : pollerPrivate(*p), 
signalPipe(p->signalPipe), busy(false),
                                        currentPollfd(0), pollCount(0), 
waiters(0) {
            // The signal pipe is the first element of pollfds and pollHandles
            pollfds.reserve(8);
            pollfds.resize(1);
            pollfds[0].fd = pollerPrivate.signalPipe.getFD();
            pollfds[0].events = POLLIN;
            pollfds[0].revents = 0;

            pollHandles.reserve(8);
            pollHandles.resize(1);
            pollHandles[0] = 0;
        }

        void addInterrupt(PollerHandle& handle) {
            ScopedLock<Mutex> l(streamLock);
            interruptedHandles.push(handle.impl);
        }

        // Serialize access to the stream.
        Event next(Duration timeout) {
            AbsTime targetTimeout =
                (timeout == TIME_INFINITE) ?
                FAR_FUTURE :
                AbsTime(now(), timeout);


            ScopedLock<Mutex> l(serializeLock);
            Event event(0, Poller::INVALID);
            while (busy) {
                waiters++;
                bool timedout = !serializer.wait(serializeLock, targetTimeout);
                waiters--;

                if (busy && timedout) {
                    return Event(0, Poller::TIMEOUT);
                }
            }
            busy = true;
            {
                ScopedUnlock<Mutex> ul(serializeLock);
                event = getEvent(targetTimeout);
            }
            busy = false;

            if (waiters > 0)
                serializer.notify();
            return event;
        }

        Event getEvent(AbsTime targetTimeout) {
            bool timeoutPending = false;

            ScopedLock<Mutex> l(streamLock); // hold lock except for poll()

            // loop until poll event, async interrupt, or timeout
            while (true) {

                // first check for any interrupts
                while (interruptedHandles.size() > 0) {
                    PollerHandlePrivate& eh = *interruptedHandles.front();
                    interruptedHandles.pop();
                    {
                        ScopedLock<Mutex> lk(eh.lock);
                        if (!eh.isDeleted()) {
                            if (!eh.isIdle()) {
                                eh.setInactive();
                            }

                            // nullify the corresponding pollfd event, if any
                            int ehfd = eh.fd();
                            std::vector<struct ::pollfd>::iterator i = 
pollfds.begin() + 1; // skip self pipe at front
                            for (; i != pollfds.end(); i++) {
                                if (i->fd == ehfd) {
                                    i->events = 0;
                                    if (i->revents) {
                                        i->revents = 0;
                                        pollCount--;
                                    }
                                    break;
                                }
                            }
                            return Event(eh.pollerHandle, Poller::INTERRUPTED);
                        }
                    }
                    PollerHandleDeletionManager.markForDeletion(&eh);
                }

                // Check for shutdown
                if (pollerPrivate.isShutdown) {
                    PollerHandleDeletionManager.markAllUnusedInThisThread();
                    return Event(0, Poller::SHUTDOWN);
                }

                // search for any remaining events from earlier poll()
                int nfds = pollfds.size();
                while ((pollCount > 0) && (currentPollfd < nfds)) {
                    int index = currentPollfd++;
                    short evt = pollfds[index].revents;
                    if (evt != 0) {
                        pollCount--;
                        PollerHandlePrivate& eh = *pollHandles[index];
                        ScopedLock<Mutex> l(eh.lock);
                        // stop polling this handle until resetMode()
                        pollfds[index].events = 0;

                        // the handle could have gone inactive since snapshot 
taken
                        if (eh.isActive()) {
                            PollerHandle* handle = eh.pollerHandle;
                            assert(handle);

                            // If the connection has been hungup we could still 
be readable
                            // (just not writable), allow us to readable until 
we get here again
                            if (evt & POLLHUP) {
                                if (eh.isHungup()) {
                                    eh.setInactive();
                                    // Don't set up last Handle so that we 
don't reset this handle
                                    // on re-entering Poller::wait. This means 
that we will never
                                    // be set active again once we've returned 
disconnected, and so
                                    // can never be returned again.
                                    return Event(handle, Poller::DISCONNECTED);
                                }
                                eh.setHungup();
                            } else {
                                eh.setInactive();
                            }
                            return Event(handle, 
PollerPrivate::epollToDirection(evt));
                        }
                    }
                }

                if (timeoutPending) {
                    return Event(0, Poller::TIMEOUT);
                }

                // no outstanding events, poll() for more
                {
                    ScopedUnlock<Mutex> ul(streamLock);

                    bool refreshed = 
pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds);
                    if (refreshed) {
                        // we just drained all interruptedHandles and got a 
fresh snapshot
                        PollerHandleDeletionManager.markAllUnusedInThisThread();
                    }

                    if (!signalPipe.isSet()) {
                        int timeoutMs = -1;
                        if (!(targetTimeout == FAR_FUTURE)) {
                            timeoutMs = Duration(now(), targetTimeout) / 
TIME_MSEC;
                            if (timeoutMs < 0)
                                timeoutMs = 0;
                        }

                        pollCount = ::poll(&pollfds[0], pollfds.size(), 
timeoutMs);

                        if (pollCount ==-1 && errno != EINTR) {
                            QPID_POSIX_CHECK(pollCount);
                        }
                        else if (pollCount == 0) {
                            // timeout, unless shutdown or interrupt arrives in 
another thread
                            timeoutPending = true;
                        }
                        else {
                            if (pollfds[0].revents) {
                                pollCount--; // signal pipe doesn't count
                            }
                        }
                    }
                    else
                        pollCount = 0;
                    signalPipe.reset();
                }
                currentPollfd = 1;
            }
        }
    };

    bool isShutdown;
    HandleSet registeredHandles;
    AtomicCount threadCount;
    SignalPipe signalPipe;
    EventStream eventStream;

    static short directionToEpollEvent(Poller::Direction dir) {
        switch (dir) {
            case Poller::INPUT:  return POLLIN;
            case Poller::OUTPUT: return POLLOUT;
            case Poller::INOUT:  return POLLIN | POLLOUT;
            default: return 0;
        }
    }

    static Poller::EventType epollToDirection(short events) {
        // POLLOUT & POLLHUP are mutually exclusive really, but at least 
socketpairs
        // can give you both!
        events = (events & POLLHUP) ? events & ~POLLOUT : events;
        short e = events & (POLLIN | POLLOUT);
        switch (e) {
            case POLLIN: return Poller::READABLE;
            case POLLOUT: return Poller::WRITABLE;
            case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
            default:
              return (events & (POLLHUP | POLLERR)) ?
                    Poller::DISCONNECTED : Poller::INVALID;
        }
    }

    PollerPrivate() :
        isShutdown(false), eventStream(this) {
    }

    ~PollerPrivate() {}

    void resetMode(PollerHandlePrivate& handle);

    void interrupt() {
        signalPipe.set();
    }

    void interruptAll() {
        // be async signal safe
        signalPipe.setPermanently();
    }
};


void Poller::registerHandle(PollerHandle& handle) {
    PollerHandlePrivate& eh = *handle.impl;
    ScopedLock<Mutex> l(eh.lock);
    assert(eh.isIdle());

    eh.setActive();
    impl->registeredHandles.add(handle.impl);
    // not stale until monitored
}

void Poller::unregisterHandle(PollerHandle& handle) {
    PollerHandlePrivate& eh = *handle.impl;
    ScopedLock<Mutex> l(eh.lock);
    assert(!eh.isIdle());

    eh.setIdle();
    impl->registeredHandles.remove(handle.impl);
    impl->registeredHandles.setStale();
    impl->interrupt();
}

void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
    PollerHandle* ph;
    {
        // Called after an event has been processed for a handle
        ScopedLock<Mutex> l(eh.lock);
        assert(!eh.isActive());

        if (eh.isIdle() || eh.isDeleted()) {
            return;
        }

        if (eh.events==0) {
            eh.setActive();
            return;
        }

        if (!eh.isInterrupted()) {
            // Handle still in use, allow events to resume.
            eh.setActive();
            registeredHandles.setStale();
            // Ouch. This scales poorly for large handle sets.
            // TODO: avoid new snapshot, perhaps create an index to pollfds or a
            // pending reset queue to be processed before each poll().  
However, the real
            // scalable solution is to implement the OS-specific epoll 
equivalent.
            interrupt();
            return;
        }
        ph = eh.pollerHandle;
    }

    eventStream.addInterrupt(*ph);
    interrupt();
}

void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
    PollerHandlePrivate& eh = *handle.impl;
    ScopedLock<Mutex> l(eh.lock);
    assert(!eh.isIdle());

    short oldEvents = eh.events;
    eh.events |= PollerPrivate::directionToEpollEvent(dir);

    // If no change nothing more to do - avoid unnecessary system call
    if (oldEvents==eh.events) {
        return;
    }

    // If we're not actually listening wait till we are to perform change
    if (!eh.isActive()) {
        return;
    }

    // tell polling thread to update its pollfds
    impl->registeredHandles.setStale();
    impl->interrupt();
}

void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
    PollerHandlePrivate& eh = *handle.impl;
    ScopedLock<Mutex> l(eh.lock);
    assert(!eh.isIdle());

    short oldEvents = eh.events;
    eh.events &= ~PollerPrivate::directionToEpollEvent(dir);

    // If no change nothing more to do - avoid unnecessary system call
    if (oldEvents==eh.events) {
        return;
    }

    // If we're not actually listening wait till we are to perform change
    if (!eh.isActive()) {
        return;
    }

    impl->registeredHandles.setStale();
    impl->interrupt();
}

void Poller::shutdown() {
    // NB: this function must be async-signal safe, it must not
    // call any function that is not async-signal safe.

    // Allow sloppy code to shut us down more than once
    if (impl->isShutdown)
        return;

    // Don't use any locking here - isShutdown will be visible to all
    // after the write() anyway (it's a memory barrier)
    impl->isShutdown = true;

    impl->interruptAll();
}

bool Poller::interrupt(PollerHandle& handle) {
    {
        PollerHandlePrivate& eh = *handle.impl;
        ScopedLock<Mutex> l(eh.lock);
        if (eh.isIdle() || eh.isDeleted()) {
            return false;
        }

        if (eh.isInterrupted()) {
            return true;
        }

        if (eh.isInactive()) {
            eh.setInterrupted();
            return true;
        }
        eh.setInterrupted();
        eh.events = 0;
    }

    impl->registeredHandles.setStale();
    impl->eventStream.addInterrupt(handle);
    impl->interrupt();
    return true;
}

void Poller::run() {
    // Ensure that we exit thread responsibly under all circumstances
    try {
        // Make sure we can't be interrupted by signals at a bad time
        ::sigset_t ss;
        ::sigfillset(&ss);
        ::pthread_sigmask(SIG_SETMASK, &ss, 0);

        ++(impl->threadCount);
        do {
            Event event = wait();

            // If can read/write then dispatch appropriate callbacks
            if (event.handle) {
                event.process();
            } else {
                // Handle shutdown
                switch (event.type) {
                case SHUTDOWN:
                    //last thread to respond to shutdown cleans up:
                    if (--(impl->threadCount) == 0) 
impl->registeredHandles.cleanup();
                    PollerHandleDeletionManager.destroyThreadState();
                    return;
                default:
                    // This should be impossible
                    assert(false);
                }
            }
        } while (true);
    } catch (const std::exception& e) {
        QPID_LOG(error, "IO worker thread exiting with unhandled exception: " 
<< e.what());
    }
    PollerHandleDeletionManager.destroyThreadState();
    --(impl->threadCount);
}

bool Poller::hasShutdown()
{
    return impl->isShutdown;
}

Poller::Event Poller::wait(Duration timeout) {
    static __thread PollerHandlePrivate* lastReturnedHandle = 0;

    if (lastReturnedHandle) {
        impl->resetMode(*lastReturnedHandle);
        lastReturnedHandle = 0;
    }

    Event event = impl->eventStream.next(timeout);

    switch (event.type) {
    case INTERRUPTED:
    case READABLE:
    case WRITABLE:
    case READ_WRITABLE:
        lastReturnedHandle = event.handle->impl;
        break;
    default:
        ;
    }

    return event;
}

// Concrete constructors
Poller::Poller() :
    impl(new PollerPrivate())
{}

Poller::~Poller() {
    delete impl;
}


bool HandleSet::snapshot(std::vector<PollerHandlePrivate *>& hs , 
std::vector<struct ::pollfd>& fds)
{
    // Element 0 of the vectors is always the signal pipe, leave undisturbed
    {
        ScopedLock<Mutex> l(lock);
        if (!stale)
            return false;       // no refresh done

        hs.resize(1);
        for (std::set<PollerHandlePrivate*>::const_iterator i = 
handles.begin(); i != handles.end(); ++i) {
            hs.push_back(*i);
        }
        stale = false;
        // have copy of handle set (in vector form), drop the lock and build 
the pollfds
    }

    // sync pollfds to same sizing as the handles
    int sz = hs.size();
    fds.resize(sz);

    for (int j = 1; j < sz; ++j) {
        // create a pollfd entry for each handle
        struct ::pollfd& pollfd = fds[j];
        PollerHandlePrivate& eh = *hs[j];
        ScopedLock<Mutex> lk(eh.lock);

        if (!eh.isInactive() && !eh.isDeleted()) {
            pollfd.fd = eh.fd();
            pollfd.events = eh.events;
        } else {
            pollfd.fd = -1;     // tell poll() to ignore this fd
            pollfd.events = 0;
        }
    }
    return true;
}


}}
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to