Hello, We have been struggling mightily to get qpid to compile on aix 6.1 with gcc. We have tried both 4.8.2 and 4.8.3 and the error we have run into is:
/home/dbapi/enquesta_5_0/qpid-cpp-0.30/src/qpid/sys/posix/PosixPoller.cpp:796:2: internal compiler error: in function_and_variable_visibility, at ipa.c:868 }} ^ libbacktrace could not find executable to open Please submit a full bug report, with preprocessed source if appropriate. See <http://gcc.gnu.org/bugs.html> for instructions. make: 1254-004 The error code from the last command is 1. This appears to be a problem with gcc itself rather than qpid code, but since we are utilizing the latest version of the gcc compiler I could find for aix and it still does not work, any help in providing a workaround would be greatly appreciated. My coworker was able to identify the problematic lines. Two of the errors appear to be related to the use of the PollerHandleDeletionManager (Line 701 and 712) and Event (Line 729) PollerHandleDeletionManager.destroyThreadState(); (2 occurrences) Event event = impl->eventStream.next(timeout); (1 occurrence) When he commented out those lines (and additional affected code in the case of the Event line), the compilation of PosixPoller.cpp succeeded. We surmised that it has something to do with gcc handling of namespaces, but could not come up with a way to get around this issue. We are planning on trying IBM's xlc compiler, but we have invested a lot of time in this already and would like to avoid xlc licensing costs if possible. Thanks in advance, Regards, Chris Whelan
/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/sys/Poller.h" #include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicCount.h" #include "qpid/sys/DeletionManager.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/log/Statement.h" #include "qpid/sys/Condition.h" #include <poll.h> #include <errno.h> #include <signal.h> #include <assert.h> #include <queue> #include <set> #include <exception> /* * * This is a qpid::sys::Poller implementation for Posix systems. * * This module follows the structure of the Linux EpollPoller as closely as possible * to simplify maintainability. Noteworthy differences: * * The Linux epoll_xxx() calls present one event at a time to multiple callers whereas poll() * returns one or more events to a single caller. The EventStream class layers a * "one event per call" view of the poll() result to multiple threads. * * The HandleSet is the master set of in-use PollerHandles. The EventStream * maintains a snapshot copy taken just before the call to poll() that remains static * until all flagged events have been processed. * * There is an additional window where the PollerHandlePrivate class may survive the * parent PollerHandle destructor, i.e. between snapshots. * * Safe interrupting of the Poller is implemented using the "self-pipe trick". * */ namespace qpid { namespace sys { // Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager; // Instantiate (and define) class static for DeletionManager template <> DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0); class PollerHandlePrivate { friend class Poller; friend class PollerPrivate; friend class PollerHandle; friend class HandleSet; enum FDStat { ABSENT, MONITORED, INACTIVE, HUNGUP, MONITORED_HUNGUP, INTERRUPTED, INTERRUPTED_HUNGUP, DELETED }; short events; const IOHandle* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : events(0), ioHandle(h), pollerHandle(p), stat(ABSENT) { } int fd() const { return ioHandle->fd; } bool isActive() const { return stat == MONITORED || stat == MONITORED_HUNGUP; } void setActive() { stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) ? MONITORED_HUNGUP : MONITORED; } bool isInactive() const { return stat == INACTIVE || stat == HUNGUP; } void setInactive() { stat = INACTIVE; } bool isIdle() const { return stat == ABSENT; } void setIdle() { stat = ABSENT; } bool isHungup() const { return stat == MONITORED_HUNGUP || stat == HUNGUP || stat == INTERRUPTED_HUNGUP; } void setHungup() { assert(stat == MONITORED); stat = HUNGUP; } bool isInterrupted() const { return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP; } void setInterrupted() { stat = (stat == MONITORED_HUNGUP || stat == HUNGUP) ? INTERRUPTED_HUNGUP : INTERRUPTED; } bool isDeleted() const { return stat == DELETED; } void setDeleted() { stat = DELETED; } }; PollerHandle::PollerHandle(const IOHandle& h) : impl(new PollerHandlePrivate(&h, this)) {} PollerHandle::~PollerHandle() { { ScopedLock<Mutex> l(impl->lock); if (impl->isDeleted()) { return; } impl->pollerHandle = 0; if (impl->isInterrupted()) { impl->setDeleted(); return; } assert(impl->isIdle()); impl->setDeleted(); } PollerHandleDeletionManager.markForDeletion(impl); } class HandleSet { Mutex lock; bool stale; std::set<PollerHandlePrivate*> handles; public: HandleSet() : stale(true) {} void add(PollerHandlePrivate*); void remove(PollerHandlePrivate*); void cleanup(); bool snapshot(std::vector<PollerHandlePrivate *>& , std::vector<struct ::pollfd>&); void setStale(); }; void HandleSet::add(PollerHandlePrivate* h) { ScopedLock<Mutex> l(lock); handles.insert(h); } void HandleSet::remove(PollerHandlePrivate* h) { ScopedLock<Mutex> l(lock); handles.erase(h); } void HandleSet::cleanup() { // Inform all registered handles of disconnection std::set<PollerHandlePrivate*> copy; handles.swap(copy); for (std::set<PollerHandlePrivate*>::const_iterator i = copy.begin(); i != copy.end(); ++i) { PollerHandlePrivate& eh = **i; { ScopedLock<Mutex> l(eh.lock); if (!eh.isDeleted()) { Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED); event.process(); } } } } void HandleSet::setStale() { // invalidate cached pollfds for next snapshot ScopedLock<Mutex> l(lock); stale = true; } /** * Concrete implementation of Poller to use Posix poll() * interface */ class PollerPrivate { friend class Poller; friend class EventStream; friend class HandleSet; class SignalPipe { /** * Used to wakeup a thread in ::poll() */ int fds[2]; bool signaled; bool permanent; Mutex lock; public: SignalPipe() : signaled(false), permanent(false) { QPID_POSIX_CHECK(::pipe(fds)); } ~SignalPipe() { ::close(fds[0]); ::close(fds[1]); } int getFD() { return fds[0]; } bool isSet() { return signaled; } void set() { ScopedLock<Mutex> l(lock); if (signaled) return; signaled = true; QPID_POSIX_CHECK(::write(fds[1], " ", 1)); } void reset() { if (permanent) return; ScopedLock<Mutex> l(lock); if (signaled) { char ignore; QPID_POSIX_CHECK(::read(fds[0], &ignore, 1)); signaled = false; } } void setPermanently() { // async signal safe calls only. No locking. permanent = true; signaled = true; QPID_POSIX_CHECK(::write(fds[1], " ", 2)); // poll() should never block now } }; // Collect pending events and serialize access. Maintain array of pollfd structs. class EventStream { typedef Poller::Event Event; PollerPrivate& pollerPrivate; SignalPipe& signalPipe; std::queue<PollerHandlePrivate*> interruptedHandles; std::vector<struct ::pollfd> pollfds; std::vector<PollerHandlePrivate*> pollHandles; Mutex streamLock; Mutex serializeLock; Condition serializer; bool busy; int currentPollfd; int pollCount; int waiters; public: EventStream(PollerPrivate* p) : pollerPrivate(*p), signalPipe(p->signalPipe), busy(false), currentPollfd(0), pollCount(0), waiters(0) { // The signal pipe is the first element of pollfds and pollHandles pollfds.reserve(8); pollfds.resize(1); pollfds[0].fd = pollerPrivate.signalPipe.getFD(); pollfds[0].events = POLLIN; pollfds[0].revents = 0; pollHandles.reserve(8); pollHandles.resize(1); pollHandles[0] = 0; } void addInterrupt(PollerHandle& handle) { ScopedLock<Mutex> l(streamLock); interruptedHandles.push(handle.impl); } // Serialize access to the stream. Event next(Duration timeout) { AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE : AbsTime(now(), timeout); ScopedLock<Mutex> l(serializeLock); Event event(0, Poller::INVALID); while (busy) { waiters++; bool timedout = !serializer.wait(serializeLock, targetTimeout); waiters--; if (busy && timedout) { return Event(0, Poller::TIMEOUT); } } busy = true; { ScopedUnlock<Mutex> ul(serializeLock); event = getEvent(targetTimeout); } busy = false; if (waiters > 0) serializer.notify(); return event; } Event getEvent(AbsTime targetTimeout) { bool timeoutPending = false; ScopedLock<Mutex> l(streamLock); // hold lock except for poll() // loop until poll event, async interrupt, or timeout while (true) { // first check for any interrupts while (interruptedHandles.size() > 0) { PollerHandlePrivate& eh = *interruptedHandles.front(); interruptedHandles.pop(); { ScopedLock<Mutex> lk(eh.lock); if (!eh.isDeleted()) { if (!eh.isIdle()) { eh.setInactive(); } // nullify the corresponding pollfd event, if any int ehfd = eh.fd(); std::vector<struct ::pollfd>::iterator i = pollfds.begin() + 1; // skip self pipe at front for (; i != pollfds.end(); i++) { if (i->fd == ehfd) { i->events = 0; if (i->revents) { i->revents = 0; pollCount--; } break; } } return Event(eh.pollerHandle, Poller::INTERRUPTED); } } PollerHandleDeletionManager.markForDeletion(&eh); } // Check for shutdown if (pollerPrivate.isShutdown) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, Poller::SHUTDOWN); } // search for any remaining events from earlier poll() int nfds = pollfds.size(); while ((pollCount > 0) && (currentPollfd < nfds)) { int index = currentPollfd++; short evt = pollfds[index].revents; if (evt != 0) { pollCount--; PollerHandlePrivate& eh = *pollHandles[index]; ScopedLock<Mutex> l(eh.lock); // stop polling this handle until resetMode() pollfds[index].events = 0; // the handle could have gone inactive since snapshot taken if (eh.isActive()) { PollerHandle* handle = eh.pollerHandle; assert(handle); // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again if (evt & POLLHUP) { if (eh.isHungup()) { eh.setInactive(); // Don't set up last Handle so that we don't reset this handle // on re-entering Poller::wait. This means that we will never // be set active again once we've returned disconnected, and so // can never be returned again. return Event(handle, Poller::DISCONNECTED); } eh.setHungup(); } else { eh.setInactive(); } return Event(handle, PollerPrivate::epollToDirection(evt)); } } } if (timeoutPending) { return Event(0, Poller::TIMEOUT); } // no outstanding events, poll() for more { ScopedUnlock<Mutex> ul(streamLock); bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds); if (refreshed) { // we just drained all interruptedHandles and got a fresh snapshot PollerHandleDeletionManager.markAllUnusedInThisThread(); } if (!signalPipe.isSet()) { int timeoutMs = -1; if (!(targetTimeout == FAR_FUTURE)) { timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC; if (timeoutMs < 0) timeoutMs = 0; } pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs); if (pollCount ==-1 && errno != EINTR) { QPID_POSIX_CHECK(pollCount); } else if (pollCount == 0) { // timeout, unless shutdown or interrupt arrives in another thread timeoutPending = true; } else { if (pollfds[0].revents) { pollCount--; // signal pipe doesn't count } } } else pollCount = 0; signalPipe.reset(); } currentPollfd = 1; } } }; bool isShutdown; HandleSet registeredHandles; AtomicCount threadCount; SignalPipe signalPipe; EventStream eventStream; static short directionToEpollEvent(Poller::Direction dir) { switch (dir) { case Poller::INPUT: return POLLIN; case Poller::OUTPUT: return POLLOUT; case Poller::INOUT: return POLLIN | POLLOUT; default: return 0; } } static Poller::EventType epollToDirection(short events) { // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs // can give you both! events = (events & POLLHUP) ? events & ~POLLOUT : events; short e = events & (POLLIN | POLLOUT); switch (e) { case POLLIN: return Poller::READABLE; case POLLOUT: return Poller::WRITABLE; case POLLIN | POLLOUT: return Poller::READ_WRITABLE; default: return (events & (POLLHUP | POLLERR)) ? Poller::DISCONNECTED : Poller::INVALID; } } PollerPrivate() : isShutdown(false), eventStream(this) { } ~PollerPrivate() {} void resetMode(PollerHandlePrivate& handle); void interrupt() { signalPipe.set(); } void interruptAll() { // be async signal safe signalPipe.setPermanently(); } }; void Poller::registerHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(eh.isIdle()); eh.setActive(); impl->registeredHandles.add(handle.impl); // not stale until monitored } void Poller::unregisterHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); eh.setIdle(); impl->registeredHandles.remove(handle.impl); impl->registeredHandles.setStale(); impl->interrupt(); } void PollerPrivate::resetMode(PollerHandlePrivate& eh) { PollerHandle* ph; { // Called after an event has been processed for a handle ScopedLock<Mutex> l(eh.lock); assert(!eh.isActive()); if (eh.isIdle() || eh.isDeleted()) { return; } if (eh.events==0) { eh.setActive(); return; } if (!eh.isInterrupted()) { // Handle still in use, allow events to resume. eh.setActive(); registeredHandles.setStale(); // Ouch. This scales poorly for large handle sets. // TODO: avoid new snapshot, perhaps create an index to pollfds or a // pending reset queue to be processed before each poll(). However, the real // scalable solution is to implement the OS-specific epoll equivalent. interrupt(); return; } ph = eh.pollerHandle; } eventStream.addInterrupt(*ph); interrupt(); } void Poller::monitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); short oldEvents = eh.events; eh.events |= PollerPrivate::directionToEpollEvent(dir); // If no change nothing more to do - avoid unnecessary system call if (oldEvents==eh.events) { return; } // If we're not actually listening wait till we are to perform change if (!eh.isActive()) { return; } // tell polling thread to update its pollfds impl->registeredHandles.setStale(); impl->interrupt(); } void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); short oldEvents = eh.events; eh.events &= ~PollerPrivate::directionToEpollEvent(dir); // If no change nothing more to do - avoid unnecessary system call if (oldEvents==eh.events) { return; } // If we're not actually listening wait till we are to perform change if (!eh.isActive()) { return; } impl->registeredHandles.setStale(); impl->interrupt(); } void Poller::shutdown() { // NB: this function must be async-signal safe, it must not // call any function that is not async-signal safe. // Allow sloppy code to shut us down more than once if (impl->isShutdown) return; // Don't use any locking here - isShutdown will be visible to all // after the write() anyway (it's a memory barrier) impl->isShutdown = true; impl->interruptAll(); } bool Poller::interrupt(PollerHandle& handle) { { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); if (eh.isIdle() || eh.isDeleted()) { return false; } if (eh.isInterrupted()) { return true; } if (eh.isInactive()) { eh.setInterrupted(); return true; } eh.setInterrupted(); eh.events = 0; } impl->registeredHandles.setStale(); impl->eventStream.addInterrupt(handle); impl->interrupt(); return true; } void Poller::run() { // Ensure that we exit thread responsibly under all circumstances try { // Make sure we can't be interrupted by signals at a bad time ::sigset_t ss; ::sigfillset(&ss); ::pthread_sigmask(SIG_SETMASK, &ss, 0); ++(impl->threadCount); do { Event event = wait(); // If can read/write then dispatch appropriate callbacks if (event.handle) { event.process(); } else { // Handle shutdown switch (event.type) { case SHUTDOWN: //last thread to respond to shutdown cleans up: if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup(); PollerHandleDeletionManager.destroyThreadState(); return; default: // This should be impossible assert(false); } } } while (true); } catch (const std::exception& e) { QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what()); } PollerHandleDeletionManager.destroyThreadState(); --(impl->threadCount); } bool Poller::hasShutdown() { return impl->isShutdown; } Poller::Event Poller::wait(Duration timeout) { static __thread PollerHandlePrivate* lastReturnedHandle = 0; if (lastReturnedHandle) { impl->resetMode(*lastReturnedHandle); lastReturnedHandle = 0; } Event event = impl->eventStream.next(timeout); switch (event.type) { case INTERRUPTED: case READABLE: case WRITABLE: case READ_WRITABLE: lastReturnedHandle = event.handle->impl; break; default: ; } return event; } // Concrete constructors Poller::Poller() : impl(new PollerPrivate()) {} Poller::~Poller() { delete impl; } bool HandleSet::snapshot(std::vector<PollerHandlePrivate *>& hs , std::vector<struct ::pollfd>& fds) { // Element 0 of the vectors is always the signal pipe, leave undisturbed { ScopedLock<Mutex> l(lock); if (!stale) return false; // no refresh done hs.resize(1); for (std::set<PollerHandlePrivate*>::const_iterator i = handles.begin(); i != handles.end(); ++i) { hs.push_back(*i); } stale = false; // have copy of handle set (in vector form), drop the lock and build the pollfds } // sync pollfds to same sizing as the handles int sz = hs.size(); fds.resize(sz); for (int j = 1; j < sz; ++j) { // create a pollfd entry for each handle struct ::pollfd& pollfd = fds[j]; PollerHandlePrivate& eh = *hs[j]; ScopedLock<Mutex> lk(eh.lock); if (!eh.isInactive() && !eh.isDeleted()) { pollfd.fd = eh.fd(); pollfd.events = eh.events; } else { pollfd.fd = -1; // tell poll() to ignore this fd pollfd.events = 0; } } return true; } }}
--------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org