Author: astitcher
Date: Tue Feb 21 20:16:25 2012
New Revision: 1291981

URL: http://svn.apache.org/viewvc?rev=1291981&view=rev
Log:
QPID-3862: Static Trace points for System tap / Dtrace

Added framework to allow static probes in DTRACE_PROBE() format
to be added to the qpid code. This will be usable under System Tap for
Linux, and DTrace for Solaris and FreeBSD.

Also included some initial probes into the low level IO code to see how
it performs.

Added:
    qpid/trunk/qpid/cpp/src/qpid/sys/Probes.h
Modified:
    qpid/trunk/qpid/cpp/configure.ac
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/config.h.cmake
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp

Modified: qpid/trunk/qpid/cpp/configure.ac
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/configure.ac?rev=1291981&r1=1291980&r2=1291981&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/configure.ac (original)
+++ qpid/trunk/qpid/cpp/configure.ac Tue Feb 21 20:16:25 2012
@@ -480,6 +480,24 @@ case "$host" in
 esac
 AM_CONDITIONAL([SUNOS], [test x$arch = xsolaris])
 
+# Check whether we've got the header for dtrace static probes
+AC_ARG_WITH([probes],
+ [AS_HELP_STRING([--with-probes], [Build with dtrace/systemtap static 
probes])],
+ [case ${withval} in
+  yes)
+   AC_CHECK_HEADERS([sys/sdt.h])
+   ;;
+  no)
+   ;;
+  *)
+   AC_MSG_ERROR([Bad value for --with-probes: ${withval}])
+   ;;
+  esac],
+  [
+   AC_CHECK_HEADERS([sys/sdt.h])
+  ]
+)
+
 # Check for some syslog capabilities not present in all systems
 AC_TRY_COMPILE([#include <sys/syslog.h>], 
                [int v = LOG_AUTHPRIV;],

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1291981&r1=1291980&r2=1291981&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Feb 21 20:16:25 2012
@@ -477,6 +477,16 @@ else (NOT CLOCK_GETTIME_IN_RT)
   set(QPID_HAS_CLOCK_GETTIME YES CACHE BOOL "Platform has clock_gettime")
 endif (NOT CLOCK_GETTIME_IN_RT)
 
+# Check for header file for dtrace static probes
+check_include_files(sys/sdt.h HAVE_SYS_SDT_H)
+if (HAVE_SYS_SDT_H)
+  set(probes_default ON)
+endif (HAVE_SYS_SDT_H)
+option(BUILD_PROBES "Build with DTrace/systemtap static probes" 
${probes_default})
+if (NOT BUILD_PROBES)
+  set (HAVE_SYS_SDT_H 0)
+endif (NOT BUILD_PROBES)
+
 # If not windows ensure that we have uuid library
 if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
   CHECK_LIBRARY_EXISTS (uuid uuid_compare "" HAVE_UUID)

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1291981&r1=1291980&r2=1291981&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Feb 21 20:16:25 2012
@@ -487,6 +487,7 @@ libqpidcommon_la_SOURCES +=                 \
   qpid/sys/PollableCondition.h                 \
   qpid/sys/PollableQueue.h                     \
   qpid/sys/Poller.h                            \
+  qpid/sys/Probes.h                            \
   qpid/sys/ProtocolFactory.h                   \
   qpid/sys/Runnable.cpp                                \
   qpid/sys/ScopedIncrement.h                   \

Modified: qpid/trunk/qpid/cpp/src/config.h.cmake
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/config.h.cmake?rev=1291981&r1=1291980&r2=1291981&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/config.h.cmake (original)
+++ qpid/trunk/qpid/cpp/src/config.h.cmake Tue Feb 21 20:16:25 2012
@@ -60,6 +60,7 @@
 #cmakedefine HAVE_OPENAIS_CPG_H ${HAVE_OPENAIS_CPG_H}
 #cmakedefine HAVE_COROSYNC_CPG_H ${HAVE_COROSYNC_CPG_H}
 #cmakedefine HAVE_LIBCMAN_H ${HAVE_LIBCMAN_H}
+#cmakedefine HAVE_SYS_SDT_H ${HAVE_SYS_SDT_H}
 #cmakedefine HAVE_LOG_AUTHPRIV
 #cmakedefine HAVE_LOG_FTP
 

Added: qpid/trunk/qpid/cpp/src/qpid/sys/Probes.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Probes.h?rev=1291981&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Probes.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Probes.h Tue Feb 21 20:16:25 2012
@@ -0,0 +1,65 @@
+#ifndef _sys_Probes
+#define _sys_Probes
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "config.h"
+
+#ifdef HAVE_SYS_SDT_H
+#include <sys/sdt.h>
+#endif
+
+// Pragmatically it seems that Linux and Solaris versions of sdt.h which 
support 
+// user static probes define up to DTRACE_PROBE8, but FreeBSD 8 which doesn't
+// support usdt only defines up to DTRACE_PROBE4 - FreeBSD 9 which does 
support usdt
+// defines up to DTRACE_PROBE5.
+
+#ifdef DTRACE_PROBE5
+// Versions for Linux Systemtap/Solaris/FreeBSD 9
+#define QPID_PROBE(probe) DTRACE_PROBE(qpid, probe)
+#define QPID_PROBE1(probe, p1) DTRACE_PROBE1(qpid, probe, p1)
+#define QPID_PROBE2(probe, p1, p2) DTRACE_PROBE2(qpid, probe, p1, p2)
+#define QPID_PROBE3(probe, p1, p2, p3) DTRACE_PROBE3(qpid, probe, p1, p2, p3)
+#define QPID_PROBE4(probe, p1, p2, p3, p4) DTRACE_PROBE4(qpid, probe, p1, p2, 
p3, p4)
+#define QPID_PROBE5(probe, p1, p2, p3, p4, p5) DTRACE_PROBE5(qpid, probe, p1, 
p2, p3, p4, p5)
+#else
+// FreeBSD 8
+#define QPID_PROBE(probe)
+#define QPID_PROBE1(probe, p1)
+#define QPID_PROBE2(probe, p1, p2)
+#define QPID_PROBE3(probe, p1, p2, p3)
+#define QPID_PROBE4(probe, p1, p2, p3, p4)
+#define QPID_PROBE5(probe, p1, p2, p3, p4, p5)
+#endif
+
+#ifdef DTRACE_PROBE8
+// Versions for Linux Systemtap
+#define QPID_PROBE6(probe, p1, p2, p3, p4, p5, p6) DTRACE_PROBE6(qpid, probe, 
p1, p2, p3, p4, p5, p6)
+#define QPID_PROBE7(probe, p1, p2, p3, p4, p5, p6, p7) DTRACE_PROBE7(qpid, 
probe, p1, p2, p3, p4, p5, p6, p7)
+#define QPID_PROBE8(probe, p1, p2, p3, p4, p5, p6, p7, p8) DTRACE_PROBE8(qpid, 
probe, p1, p2, p3, p4, p5, p6, p7, p8)
+#else
+// Versions for Solaris/FreeBSD
+#define QPID_PROBE6(probe, p1, p2, p3, p4, p5, p6)
+#define QPID_PROBE7(probe, p1, p2, p3, p4, p5, p6, p7)
+#define QPID_PROBE8(probe, p1, p2, p3, p4, p5, p6, p7, p8)
+#endif
+
+#endif // _sys_Probes

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1291981&r1=1291980&r2=1291981&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Feb 21 20:16:25 2012
@@ -23,6 +23,7 @@
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/Poller.h"
+#include "qpid/sys/Probes.h"
 #include "qpid/sys/DispatchHandle.h"
 #include "qpid/sys/Time.h"
 #include "qpid/log/Statement.h"
@@ -423,9 +424,12 @@ AsynchIO::BufferBase* AsynchIO::getQueue
 void AsynchIO::readable(DispatchHandle& h) {
     if (readingStopped) {
         // We have been flow controlled.
+        QPID_PROBE1(asynchio_read_flowcontrolled, &h);
         return;
     }
     AbsTime readStartTime = AbsTime::now();
+    size_t total = 0;
+    int readCalls = 0;
     do {
         // (Try to) get a buffer
         if (!bufferQueue.empty()) {
@@ -436,23 +440,29 @@ void AsynchIO::readable(DispatchHandle& 
             errno = 0;
             int readCount = buff->byteCount-buff->dataCount;
             int rc = socket.read(buff->bytes + buff->dataCount, readCount);
+            int64_t duration = Duration(readStartTime, AbsTime::now());
+            ++readCalls;
             if (rc > 0) {
                 buff->dataCount += rc;
                 threadReadTotal += rc;
+                total += rc;
 
                 readCallback(*this, buff);
                 if (readingStopped) {
                     // We have been flow controlled.
+                    QPID_PROBE4(asynchio_read_finished_flowcontrolled, &h, 
duration, total, readCalls);
                     break;
                 }
 
                 if (rc != readCount) {
                     // If we didn't fill the read buffer then time to stop 
reading
+                    QPID_PROBE4(asynchio_read_finished_done, &h, duration, 
total, readCalls);
                     break;
                 }
 
                 // Stop reading if we've overrun our timeslot
-                if (Duration(readStartTime, AbsTime::now()) > 
threadMaxIoTimeNs) {
+                if ( duration > threadMaxIoTimeNs) {
+                    QPID_PROBE4(asynchio_read_finished_maxtime, &h, duration, 
total, readCalls);
                     break;
                 }
 
@@ -461,6 +471,7 @@ void AsynchIO::readable(DispatchHandle& 
                 bufferQueue.push_front(buff);
                 assert(buff);
 
+                QPID_PROBE5(asynchio_read_finished_error, &h, duration, total, 
readCalls, errno);
                 // Eof or other side has gone away
                 if (rc == 0 || errno == ECONNRESET) {
                     eofCallback(*this);
@@ -486,6 +497,7 @@ void AsynchIO::readable(DispatchHandle& 
             // If we still have no buffers we can't do anything more
             if (bufferQueue.empty()) {
                 h.unwatchRead();
+                QPID_PROBE4(asynchio_read_finished_nobuffers, &h, 
Duration(readStartTime, AbsTime::now()), total, readCalls);
                 break;
             }
 
@@ -501,6 +513,8 @@ void AsynchIO::readable(DispatchHandle& 
  */
 void AsynchIO::writeable(DispatchHandle& h) {
     AbsTime writeStartTime = AbsTime::now();
+    size_t total = 0;
+    int writeCalls = 0;
     do {
         // See if we've got something to write
         if (!writeQueue.empty()) {
@@ -510,14 +524,18 @@ void AsynchIO::writeable(DispatchHandle&
             errno = 0;
             assert(buff->dataStart+buff->dataCount <= buff->byteCount);
             int rc = socket.write(buff->bytes+buff->dataStart, 
buff->dataCount);
+            int64_t duration = Duration(writeStartTime, AbsTime::now());
+            ++writeCalls;
             if (rc >= 0) {
                 threadWriteTotal += rc;
+                total += rc;
 
                 // If we didn't write full buffer put rest back
                 if (rc != buff->dataCount) {
                     buff->dataStart += rc;
                     buff->dataCount -= rc;
                     writeQueue.push_back(buff);
+                    QPID_PROBE4(asynchio_write_finished_done, &h, duration, 
total, writeCalls);
                     break;
                 }
 
@@ -525,12 +543,15 @@ void AsynchIO::writeable(DispatchHandle&
                 queueReadBuffer(buff);
 
                 // Stop writing if we've overrun our timeslot
-                if (Duration(writeStartTime, AbsTime::now()) > 
threadMaxIoTimeNs) {
+                if (duration > threadMaxIoTimeNs) {
+                    QPID_PROBE4(asynchio_write_finished_maxtime, &h, duration, 
total, writeCalls);
                     break;
                 }
             } else {
                 // Put buffer back
                 writeQueue.push_back(buff);
+                QPID_PROBE5(asynchio_write_finished_error, &h, duration, 
total, writeCalls, errno);
+
                 if (errno == ECONNRESET || errno == EPIPE) {
                     // Just stop watching for write here - we'll get a
                     // disconnect callback soon enough
@@ -548,9 +569,13 @@ void AsynchIO::writeable(DispatchHandle&
                 }
             }
         } else {
+            int64_t duration = Duration(writeStartTime, AbsTime::now());
+            (void) duration; // force duration to be used if no probes are 
compiled
+
             // If we're waiting to close the socket then can do it now as 
there is nothing to write
             if (queuedClose) {
                 close(h);
+                QPID_PROBE4(asynchio_write_finished_closed, &h, duration, 
total, writeCalls);
                 break;
             }
             // Fd is writable, but nothing to write
@@ -567,6 +592,7 @@ void AsynchIO::writeable(DispatchHandle&
                 // desired rewatchWrite so we correct that here
                 if (writePending)
                     h.rewatchWrite();
+                QPID_PROBE4(asynchio_write_finished_nodata, &h, duration, 
total, writeCalls);
                 break;
             }
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to