Author: aconway Date: Tue Jan 20 14:11:37 2009 New Revision: 736135 URL: http://svn.apache.org/viewvc?rev=736135&view=rev Log: Latency measurements, compiled out of production code.
Added: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp (with props) qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h (with props) Modified: qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=736135&r1=736134&r2=736135&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Jan 20 14:11:37 2009 @@ -349,6 +349,8 @@ qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ qpid/sys/DispatchHandle.cpp \ + qpid/sys/LatencyMetric.cpp \ + qpid/sys/LatencyMetric.h \ qpid/sys/Runnable.cpp \ qpid/sys/Shlib.cpp \ qpid/sys/Timer.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=736135&r1=736134&r2=736135&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jan 20 14:11:37 2009 @@ -38,6 +38,7 @@ #include "qpid/log/Statement.h" #include "qpid/log/Helpers.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/LatencyMetric.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" @@ -182,7 +183,7 @@ MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - if (from == myId) // Record self-deliveries for flow control. + if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e, l); } @@ -206,6 +207,7 @@ } void Cluster::deliveredEvent(const EventHeader& e, const char* data) { + QPID_LATENCY_RECORD("deliver queue", e); Buffer buf(const_cast<char*>(data), e.getSize()); AMQFrame frame; if (e.isCluster()) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=736135&r1=736134&r2=736135&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Jan 20 14:11:37 2009 @@ -35,16 +35,21 @@ const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type sizeof(uint64_t) + // connection pointer only, CPG provides member ID. - sizeof(uint32_t); // payload size + sizeof(uint32_t) // payload size +#ifdef QPID_LATENCY_METRIC + + sizeof(int64_t) // timestamp +#endif + ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) : type(t), connectionId(c), size(s) {} + +Event::Event() {} + Event::Event(EventType t, const ConnectionId& c, size_t s) : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE)) -{ - encodeHeader(); -} +{} void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { if (buf.available() <= HEADER_SIZE) @@ -54,14 +59,17 @@ throw ClusterLeaveException("Invalid multicast event type"); connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); size = buf.getLong(); +#ifdef QPID_LATENCY_METRIC + latency_metric_timestamp = buf.getLongLong(); +#endif } Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { - EventHeader h; - h.decode(m, buf); // Header - Event e(h.getType(), h.getConnectionId(), h.getSize()); + Event e; + e.decode(m, buf); // Header if (buf.available() < e.size) throw ClusterLeaveException("Not enough data for multicast event"); + e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); return e; } @@ -73,11 +81,20 @@ f.encode(buf); return e; } - + +iovec Event::toIovec() { + encodeHeader(); + iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; + return iov; +} + void EventHeader::encode(Buffer& b) const { b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); b.putLong(size); +#ifdef QPID_LATENCY_METRIC + b.putLongLong(latency_metric_timestamp); +#endif } // Encode my header in my buffer. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=736135&r1=736134&r2=736135&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Jan 20 14:11:37 2009 @@ -27,6 +27,8 @@ #include "Connection.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/Buffer.h" +#include "qpid/sys/LatencyMetric.h" +#include <sys/uio.h> // For iovec #include <iosfwd> namespace qpid { @@ -37,7 +39,7 @@ // /** Header data for a multicast event */ -class EventHeader { +class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { public: EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); void decode(const MemberId& m, framing::Buffer&); @@ -65,8 +67,9 @@ */ class Event : public EventHeader { public: + Event(); /** Create an event with a buffer that can hold size bytes plus an event header. */ - Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); + Event(EventType t, const ConnectionId& c, size_t); /** Create an event copied from delivered data. */ static Event decodeCopy(const MemberId& m, framing::Buffer&); @@ -85,6 +88,8 @@ operator framing::Buffer() const; + iovec toIovec(); + private: void encodeHeader(); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=736135&r1=736134&r2=736135&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Jan 20 14:11:37 2009 @@ -23,7 +23,7 @@ #include "Cpg.h" #include "ClusterLeaveException.h" #include "qpid/log/Statement.h" - +#include "qpid/sys/LatencyMetric.h" namespace qpid { namespace cluster { @@ -59,8 +59,8 @@ return; } } + QPID_LATENCY_INIT(e); queue.push(e); - } @@ -76,7 +76,8 @@ } ++pending; } - iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; + QPID_LATENCY_RECORD("mcast send queue", *i); + iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { // cpg didn't send because of CPG flow control. if (mcastMax) { @@ -104,8 +105,9 @@ holdingQueue.clear(); } -void Multicaster::selfDeliver(const Event&) { +void Multicaster::selfDeliver(const Event& e) { sys::Mutex::ScopedLock l(lock); + QPID_LATENCY_RECORD("cpg self deliver", e); if (mcastMax) { assert(pending > 0); assert(pending <= mcastMax); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=736135&r1=736134&r2=736135&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Jan 20 14:11:37 2009 @@ -27,7 +27,6 @@ #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> -#include <sys/uio.h> // For iovec namespace qpid { Added: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp?rev=736135&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp Tue Jan 20 14:11:37 2009 @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef QPID_LATENCY_METRIC + +#include "LatencyMetric.h" +#include "Time.h" +#include <iostream> + +namespace qpid { +namespace sys { + +void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) { + const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now()); +} + +LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) : + message(msg), count(0), total(0), skipped(0), skip(skip_) +{} + +LatencyMetric::~LatencyMetric() { report(); } + +void LatencyMetric::record(const LatencyMetricTimestamp& start) { + Mutex::ScopedLock l(lock); // FIXME aconway 2009-01-20: atomics? + if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps. + if (skip) { + if (++skipped < skip) return; + else skipped = 0; + } + ++count; + int64_t now_ = Duration(now()); + total += now_ - start.latency_metric_timestamp; + // Set start time for next leg of the journey + const_cast<int64_t&>(start.latency_metric_timestamp) = now_; +} + +void LatencyMetric::report() { + using namespace std; + if (count) { + cout << "LATENCY: " << message << ": " + << total / (count * TIME_USEC) << " microseconds" << endl; + } + else { + cout << "LATENCY: " << message << ": no data." << endl; + } + count = 0; + total = 0; +} + + +}} // namespace qpid::sys + +#endif Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h?rev=736135&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h Tue Jan 20 14:11:37 2009 @@ -0,0 +1,80 @@ +#ifndef QPID_SYS_LATENCYMETRIC_H +#define QPID_SYS_LATENCYMETRIC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef QPID_LATENCY_METRIC + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace sys { + +/** Use this base class to add a timestamp for latency to an object */ +struct LatencyMetricTimestamp { + LatencyMetricTimestamp() : latency_metric_timestamp(0) {} + static void initialize(const LatencyMetricTimestamp&); + int64_t latency_metric_timestamp; +}; + +/** + * Record average latencies, report on destruction. + * + * For debugging only, use via macros below so it can be compiled out + * of production code. + */ +class LatencyMetric { + public: + /** msg should be a string literal. */ + LatencyMetric(const char* msg, int64_t skip_=0); + ~LatencyMetric(); + + void record(const LatencyMetricTimestamp& start); + + private: + void report(); + Mutex lock; + const char* message; + int64_t ignore, count, total, skipped, skip; +}; + +}} // namespace qpid::sys + +#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x) +#define QPID_LATENCY_RECORD(msg, x) do { \ + static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \ + } while (false) + + +#else /* defined QPID_LATENCY_METRIC */ + +namespace qpid { namespace sys { +class LatencyMetricTimestamp {}; +}} + +#define QPID_LATENCY_INIT(x) (void)x +#define QPID_LATENCY_RECORD(msg, x) (void)x + +#endif /* defined QPID_LATENCY_METRIC */ + +#endif /*!QPID_SYS_LATENCYMETRIC_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h ------------------------------------------------------------------------------ svn:keywords = Rev Date --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org