Author: astitcher Date: Thu May 28 06:13:57 2009 New Revision: 779446 URL: http://svn.apache.org/viewvc?rev=779446&view=rev Log: Whitespace fixes
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=779446&r1=779445&r2=779446&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Thu May 28 06:13:57 2009 @@ -138,7 +138,7 @@ { ScopedLock<Mutex> l(impl->lock); if (impl->isDeleted()) { - return; + return; } impl->pollerHandle = 0; if (impl->isInterrupted()) { @@ -187,40 +187,40 @@ static int alwaysReadableFd; class InterruptHandle: public PollerHandle { - std::queue<PollerHandle*> handles; - - void processEvent(Poller::EventType) { - PollerHandle* handle = handles.front(); - handles.pop(); - assert(handle); - - // Synthesise event - Poller::Event event(handle, Poller::INTERRUPTED); - - // Process synthesised event - event.process(); - } + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + // Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + // Process synthesised event + event.process(); + } public: - InterruptHandle() : - PollerHandle(DummyIOHandle) - {} - - void addHandle(PollerHandle& h) { - handles.push(&h); - } - - PollerHandle* getHandle() { - PollerHandle* handle = handles.front(); - handles.pop(); - return handle; - } - - bool queuedHandles() { - return handles.size() > 0; - } + InterruptHandle() : + PollerHandle(DummyIOHandle) + {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle* getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } }; - + const int epollFd; bool isShutdown; InterruptHandle interruptHandle; @@ -259,13 +259,13 @@ ::epoll_event epe; epe.events = 0; epe.data.u64 = 0; - QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); } ~PollerPrivate() { // It's probably okay to ignore any errors here as there can't be data loss ::close(epollFd); - + // Need to put the interruptHandle in idle state to delete it static_cast<PollerHandle&>(interruptHandle).impl->setIdle(); } @@ -273,14 +273,14 @@ void resetMode(PollerHandlePrivate& handle); void interrupt() { - ::epoll_event epe; - // Use EPOLLONESHOT so we only wake a single thread - epe.events = ::EPOLLIN | ::EPOLLONESHOT; - epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); - QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); + ::epoll_event epe; + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); } - + void interruptAll() { ::epoll_event epe; // Not EPOLLONESHOT, so we eventually get all threads @@ -317,7 +317,7 @@ // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { - QPID_POSIX_CHECK(rc); + QPID_POSIX_CHECK(rc); } eh.setIdle(); @@ -332,10 +332,10 @@ if (eh.isIdle() || eh.isDeleted()) { return; } - + if (eh.events==0) { eh.setActive(); - return; + return; } if (!eh.isInterrupted()) { @@ -343,11 +343,11 @@ epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - + eh.setActive(); - return; + return; } ph = eh.pollerHandle; } @@ -366,7 +366,7 @@ ::__uint32_t oldEvents = eh.events; eh.events |= PollerPrivate::directionToEpollEvent(dir); - + // If no change nothing more to do - avoid unnecessary system call if (oldEvents==eh.events) { return; @@ -376,7 +376,7 @@ if (!eh.isActive()) { return; } - + ::epoll_event epe; epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy @@ -397,12 +397,12 @@ if (oldEvents==eh.events) { return; } - + // If we're not actually listening wait till we are to perform change if (!eh.isActive()) { return; } - + ::epoll_event epe; epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy @@ -427,50 +427,50 @@ } bool Poller::interrupt(PollerHandle& handle) { - { - PollerHandlePrivate& eh = *handle.impl; - ScopedLock<Mutex> l(eh.lock); - if (eh.isIdle() || eh.isDeleted()) { - return false; - } - + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isIdle() || eh.isDeleted()) { + return false; + } + if (eh.isInterrupted()) { return true; } - + // Stop monitoring handle for read or write - ::epoll_event epe; - epe.events = 0; - epe.data.u64 = 0; // Keep valgrind happy - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; // Keep valgrind happy + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); if (eh.isInactive()) { eh.setInterrupted(); return true; } eh.setInterrupted(); - } + } - PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; ScopedLock<Mutex> l(eh.lock); - ih.addHandle(handle); - - impl->interrupt(); + ih.addHandle(handle); + + impl->interrupt(); eh.setActive(); return true; } void Poller::run() { - // Make sure we can't be interrupted by signals at a bad time - ::sigset_t ss; - ::sigfillset(&ss); + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); ::pthread_sigmask(SIG_SETMASK, &ss, 0); do { Event event = wait(); - // If can read/write then dispatch appropriate callbacks + // If can read/write then dispatch appropriate callbacks if (event.handle) { event.process(); } else { @@ -564,7 +564,7 @@ PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr); ScopedLock<Mutex> l(eh.lock); - + // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { PollerHandle* handle = eh.pollerHandle; Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=779446&r1=779445&r2=779446&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Thu May 28 06:13:57 2009 @@ -68,7 +68,7 @@ b->dataCount = b->byteCount; qp->postRecv(b); } - + for (int i = 0; i<xmitBufferCount; ++i) { // Allocate xmit buffer Buffer* b = qp->createBuffer(bufferSize); @@ -86,7 +86,7 @@ // Turn off callbacks (before doing the deletes) dataHandle.stopWatch(); - + // The buffers ptr_deque automatically deletes all the buffers we've allocated // TODO: It might turn out to be more efficient in high connection loads to reuse the // buffers rather than having to reregister them all the time (this would be straightforward if all @@ -189,7 +189,7 @@ if (doReturn) { return; } - + doWriteCallback(); // Keep track of what we need to do so that we can release the lock @@ -317,7 +317,7 @@ // disabled by the poller until we leave this code qp->notifyRecv(); qp->notifySend(); - + int recvEvents = 0; int sendEvents = 0; @@ -353,7 +353,7 @@ xmitCredit += (e.getImm() & ~FlagsMask); dataPresent = ((e.getImm() & IgnoreData) == 0); } - + // if there was no data sent then the message was only to update our credit if ( dataPresent ) { readCallback(*this, b); @@ -366,7 +366,7 @@ // Received another message ++recvCredit; - + // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) if (recvCredit > recvBufferCount/2) { // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message @@ -377,7 +377,7 @@ // Have to send something as adapters hate it when you try to transfer 0 bytes *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit); ob->dataCount = sizeof(uint32_t); - + int creditSent = recvCredit & ~FlagsMask; qp->postSend(creditSent | IgnoreData, ob); recvCredit -= creditSent; @@ -426,7 +426,7 @@ b->dataStart = 0; return b; } - + void AsynchIO::returnBuffer(Buffer* b) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); @@ -445,7 +445,7 @@ { ci->nonblocking(); } - + void ConnectionManager::start(Poller::shared_ptr poller) { startConnection(ci); handle.startWatch(poller); @@ -454,7 +454,7 @@ void ConnectionManager::event(DispatchHandle&) { connectionEvent(ci); } - + Listener::Listener( const sockaddr& src, const ConnectionParams& cp, Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=779446&r1=779445&r2=779446&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Thu May 28 06:13:57 2009 @@ -111,7 +111,7 @@ inline bool AsynchIO::writable() const { return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0); } - + inline int AsynchIO::incompletedWrites() const { return outstandingWrites; } @@ -125,7 +125,7 @@ struct ConnectionParams { int maxRecvBufferSize; int initialXmitCredit ; - + ConnectionParams(int s, int c) : maxRecvBufferSize(s), initialXmitCredit(c) @@ -142,7 +142,7 @@ typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, ErrorType> ErrorCallback; typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback; - + class ConnectionManager { Connection::intrusive_ptr ci; qpid::sys::DispatchHandle handle; @@ -150,13 +150,13 @@ protected: ErrorCallback errorCallback; DisconnectedCallback disconnectedCallback; - + public: ConnectionManager( ErrorCallback errc, DisconnectedCallback dc ); - + virtual ~ConnectionManager() {} void start(qpid::sys::Poller::shared_ptr poller); @@ -167,7 +167,7 @@ virtual void startConnection(Connection::intrusive_ptr ci) = 0; virtual void connectionEvent(Connection::intrusive_ptr ci) = 0; }; - + typedef boost::function2<bool, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectionRequestCallback; typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> EstablishedCallback; --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org