HDFS-6880. Adding tracing to DataNode data transfer protocol (iwasakims via 
cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56119fec
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56119fec
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56119fec

Branch: refs/heads/HDFS-6584
Commit: 56119fec96abbcc44c5dd82fdb694d2c3b53feb3
Parents: 8e5d671
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Tue Sep 16 13:58:40 2014 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Tue Sep 16 13:59:32 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  40 +++++-
 .../datatransfer/DataTransferProtoUtil.java     |  44 ++++++-
 .../hdfs/protocol/datatransfer/Receiver.java    | 123 ++++++++++++++-----
 .../hdfs/protocol/datatransfer/Sender.java      |  26 +++-
 .../src/main/proto/datatransfer.proto           |   8 ++
 .../org/apache/hadoop/tracing/TestTracing.java  |  52 +++++---
 .../TestTracingShortCircuitLocalRead.java       |  97 +++++++++++++++
 8 files changed, 331 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 819f636..1225311 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -461,6 +461,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7059. HAadmin transtionToActive with forceActive option can show
     confusing message.
 
+    HDFS-6880. Adding tracing to DataNode data transfer protocol. (iwasakims
+    via cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f3d6692..d368f4e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -88,6 +88,10 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -355,12 +359,22 @@ public class DFSOutputStream extends FSOutputSummer
     /** Append on an existing block? */
     private final boolean isAppend;
 
+    private final Span traceSpan;
+
     /**
      * Default construction for file create
      */
     private DataStreamer() {
+      this(null);
+    }
+
+    /**
+     * construction with tracing info
+     */
+    private DataStreamer(Span span) {
       isAppend = false;
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+      traceSpan = span;
     }
     
     /**
@@ -371,9 +385,10 @@ public class DFSOutputStream extends FSOutputSummer
      * @throws IOException if error occurs
      */
     private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
-        int bytesPerChecksum) throws IOException {
+        int bytesPerChecksum, Span span) throws IOException {
       isAppend = true;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+      traceSpan = span;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
       accessToken = lastBlock.getBlockToken();
@@ -463,6 +478,10 @@ public class DFSOutputStream extends FSOutputSummer
     @Override
     public void run() {
       long lastPacket = Time.now();
+      TraceScope traceScope = null;
+      if (traceSpan != null) {
+        traceScope = Trace.continueSpan(traceSpan);
+      }
       while (!streamerClosed && dfsClient.clientRunning) {
 
         // if the Responder encountered an error, shutdown Responder
@@ -636,6 +655,9 @@ public class DFSOutputStream extends FSOutputSummer
           }
         }
       }
+      if (traceScope != null) {
+        traceScope.close();
+      }
       closeInternal();
     }
 
@@ -1611,7 +1633,11 @@ public class DFSOutputStream extends FSOutputSummer
     computePacketChunkSize(dfsClient.getConf().writePacketSize,
         checksum.getBytesPerChecksum());
 
-    streamer = new DataStreamer();
+    Span traceSpan = null;
+    if (Trace.isTracing()) {
+      traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
+    }
+    streamer = new DataStreamer(traceSpan);
     if (favoredNodes != null && favoredNodes.length != 0) {
       streamer.setFavoredNodes(favoredNodes);
     }
@@ -1652,15 +1678,21 @@ public class DFSOutputStream extends FSOutputSummer
     this(dfsClient, src, progress, stat, checksum);
     initialFileSize = stat.getLen(); // length of file when opened
 
+    Span traceSpan = null;
+    if (Trace.isTracing()) {
+      traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
+    }
+
     // The last partial block of the file has to be filled.
     if (lastBlock != null) {
       // indicate that we are appending to an existing block
       bytesCurBlock = lastBlock.getBlockSize();
-      streamer = new DataStreamer(lastBlock, stat, 
checksum.getBytesPerChecksum());
+      streamer = new DataStreamer(lastBlock, stat,
+          checksum.getBytesPerChecksum(), traceSpan);
     } else {
       computePacketChunkSize(dfsClient.getConf().writePacketSize,
           checksum.getBytesPerChecksum());
-      streamer = new DataStreamer();
+      streamer = new DataStreamer(traceSpan);
     }
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
index 6be3810..b91e17a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
@@ -25,12 +25,16 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
-
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceInfo;
+import org.htrace.TraceScope;
 
 /**
  * Static utilities for dealing with the protocol buffers used by the
@@ -78,9 +82,41 @@ public abstract class DataTransferProtoUtil {
 
   static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
       Token<BlockTokenIdentifier> blockToken) {
-    return BaseHeaderProto.newBuilder()
+    BaseHeaderProto.Builder builder =  BaseHeaderProto.newBuilder()
       .setBlock(PBHelper.convert(blk))
-      .setToken(PBHelper.convert(blockToken))
-      .build();
+      .setToken(PBHelper.convert(blockToken));
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId())
+          .setParentId(s.getSpanId()));
+    }
+    return builder.build();
+  }
+
+  public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
+    if (proto == null) return null;
+    if (!proto.hasTraceId()) return null;
+    return new TraceInfo(proto.getTraceId(), proto.getParentId());
+  }
+
+  public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
+      String description) {
+    return continueTraceSpan(header.getBaseHeader(), description);
+  }
+
+  public static TraceScope continueTraceSpan(BaseHeaderProto header,
+      String description) {
+    return continueTraceSpan(header.getTraceInfo(), description);
+  }
+
+  public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
+      String description) {
+    TraceScope scope = null;
+    TraceInfo info = fromProto(proto);
+    if (info != null) {
+      scope = Trace.startSpan(description, info);
+    }
+    return scope;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index a09437c..daae9b7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
 import static 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
+import static 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.DataInputStream;
@@ -39,6 +40,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.htrace.TraceScope;
 
 /** Receiver */
 @InterfaceAudience.Private
@@ -108,7 +110,10 @@ public abstract class Receiver implements 
DataTransferProtocol {
   /** Receive OP_READ_BLOCK */
   private void opReadBlock() throws IOException {
     OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
-    readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    try {
+      readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
         PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
         proto.getOffset(),
@@ -117,27 +122,36 @@ public abstract class Receiver implements 
DataTransferProtocol {
         (proto.hasCachingStrategy() ?
             getCachingStrategy(proto.getCachingStrategy()) :
           CachingStrategy.newDefaultStrategy()));
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
   
   /** Receive OP_WRITE_BLOCK */
   private void opWriteBlock(DataInputStream in) throws IOException {
     final OpWriteBlockProto proto = 
OpWriteBlockProto.parseFrom(vintPrefixed(in));
     final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
-    writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
-        PBHelper.convertStorageType(proto.getStorageType()),
-        PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
-        proto.getHeader().getClientName(),
-        targets,
-        PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), 
targets.length),
-        PBHelper.convert(proto.getSource()),
-        fromProto(proto.getStage()),
-        proto.getPipelineSize(),
-        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
-        proto.getLatestGenerationStamp(),
-        fromProto(proto.getRequestedChecksum()),
-        (proto.hasCachingStrategy() ?
-            getCachingStrategy(proto.getCachingStrategy()) :
-          CachingStrategy.newDefaultStrategy()));
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    try {
+      
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+          PBHelper.convertStorageType(proto.getStorageType()),
+          PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
+          proto.getHeader().getClientName(),
+          targets,
+          PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), 
targets.length),
+          PBHelper.convert(proto.getSource()),
+          fromProto(proto.getStage()),
+          proto.getPipelineSize(),
+          proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+          proto.getLatestGenerationStamp(),
+          fromProto(proto.getRequestedChecksum()),
+          (proto.hasCachingStrategy() ?
+              getCachingStrategy(proto.getCachingStrategy()) :
+            CachingStrategy.newDefaultStrategy()));
+     } finally {
+      if (traceScope != null) traceScope.close();
+     }
   }
 
   /** Receive {@link Op#TRANSFER_BLOCK} */
@@ -145,11 +159,17 @@ public abstract class Receiver implements 
DataTransferProtocol {
     final OpTransferBlockProto proto =
       OpTransferBlockProto.parseFrom(vintPrefixed(in));
     final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
-    
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
-        PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
-        proto.getHeader().getClientName(),
-        targets,
-        PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), 
targets.length));
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    try {
+      
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+          PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
+          proto.getHeader().getClientName(),
+          targets,
+          PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), 
targets.length));
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@@ -158,9 +178,15 @@ public abstract class Receiver implements 
DataTransferProtocol {
       OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
     SlotId slotId = (proto.hasSlotId()) ? 
         PBHelper.convert(proto.getSlotId()) : null;
-    requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
-        PBHelper.convert(proto.getHeader().getToken()),
-        slotId, proto.getMaxVersion());
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    try {
+      requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
+          PBHelper.convert(proto.getHeader().getToken()),
+          slotId, proto.getMaxVersion());
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */
@@ -168,38 +194,67 @@ public abstract class Receiver implements 
DataTransferProtocol {
       throws IOException {
     final ReleaseShortCircuitAccessRequestProto proto =
       ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in));
-    releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
+    TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
+        proto.getClass().getSimpleName());
+    try {
+      releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */
   private void opRequestShortCircuitShm(DataInputStream in) throws IOException 
{
     final ShortCircuitShmRequestProto proto =
         ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in));
-    requestShortCircuitShm(proto.getClientName());
+    TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
+        proto.getClass().getSimpleName());
+    try {
+      requestShortCircuitShm(proto.getClientName());
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive OP_REPLACE_BLOCK */
   private void opReplaceBlock(DataInputStream in) throws IOException {
     OpReplaceBlockProto proto = 
OpReplaceBlockProto.parseFrom(vintPrefixed(in));
-    replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
-        PBHelper.convertStorageType(proto.getStorageType()),
-        PBHelper.convert(proto.getHeader().getToken()),
-        proto.getDelHint(),
-        PBHelper.convert(proto.getSource()));
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    try {
+      replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
+          PBHelper.convertStorageType(proto.getStorageType()),
+          PBHelper.convert(proto.getHeader().getToken()),
+          proto.getDelHint(),
+          PBHelper.convert(proto.getSource()));
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive OP_COPY_BLOCK */
   private void opCopyBlock(DataInputStream in) throws IOException {
     OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
-    copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
-        PBHelper.convert(proto.getHeader().getToken()));
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    try {
+      copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
+          PBHelper.convert(proto.getHeader().getToken()));
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive OP_BLOCK_CHECKSUM */
   private void opBlockChecksum(DataInputStream in) throws IOException {
     OpBlockChecksumProto proto = 
OpBlockChecksumProto.parseFrom(vintPrefixed(in));
-    
+    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+        proto.getClass().getSimpleName());
+    try {
     blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
         PBHelper.convert(proto.getHeader().getToken()));
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 68da523..fb6cf2c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
@@ -47,6 +48,9 @@ import 
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import org.htrace.Trace;
+import org.htrace.Span;
+
 import com.google.protobuf.Message;
 
 /** Sender */
@@ -185,19 +189,29 @@ public class Sender implements DataTransferProtocol {
   
   @Override
   public void releaseShortCircuitFds(SlotId slotId) throws IOException {
-    ReleaseShortCircuitAccessRequestProto proto = 
+    ReleaseShortCircuitAccessRequestProto.Builder builder =
         ReleaseShortCircuitAccessRequestProto.newBuilder().
-        setSlotId(PBHelper.convert(slotId)).
-        build();
+        setSlotId(PBHelper.convert(slotId));
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
+    }
+    ReleaseShortCircuitAccessRequestProto proto = builder.build();
     send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
   }
 
   @Override
   public void requestShortCircuitShm(String clientName) throws IOException {
-    ShortCircuitShmRequestProto proto =
+    ShortCircuitShmRequestProto.Builder builder =
         ShortCircuitShmRequestProto.newBuilder().
-        setClientName(clientName).
-        build();
+        setClientName(clientName);
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
+    }
+    ShortCircuitShmRequestProto proto = builder.build();
     send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 6283b56..098d10a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -47,6 +47,12 @@ message DataTransferEncryptorMessageProto {
 message BaseHeaderProto {
   required ExtendedBlockProto block = 1;
   optional hadoop.common.TokenProto token = 2;
+  optional DataTransferTraceInfoProto traceInfo = 3;
+}
+
+message DataTransferTraceInfoProto {
+  required uint64 traceId = 1;
+  required uint64 parentId = 2;
 }
 
 message ClientOperationHeaderProto {
@@ -166,6 +172,7 @@ message OpRequestShortCircuitAccessProto {
 
 message ReleaseShortCircuitAccessRequestProto {
   required ShortCircuitShmSlotProto slotId = 1;
+  optional DataTransferTraceInfoProto traceInfo = 2;
 }
 
 message ReleaseShortCircuitAccessResponseProto {
@@ -177,6 +184,7 @@ message ShortCircuitShmRequestProto {
   // The name of the client requesting the shared memory segment.  This is
   // purely for logging / debugging purposes.
   required string clientName = 1;
+  optional DataTransferTraceInfoProto traceInfo = 2;
 }
 
 message ShortCircuitShmResponseProto { 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
index bb923a2..b3e6ee8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.htrace.HTraceConfiguration;
 import org.htrace.Sampler;
 import org.htrace.Span;
@@ -39,11 +40,13 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Supplier;
 
 public class TestTracing {
 
@@ -81,7 +84,12 @@ public class TestTracing {
       
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create",
       "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
       
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync",
-      
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete"
+      "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
+      
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete",
+      "DFSOutputStream",
+      "OpWriteBlockProto",
+      "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
+      
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.addBlock"
     };
     assertSpanNamesFound(expectedSpanNames);
 
@@ -96,7 +104,7 @@ public class TestTracing {
 
     // There should only be one trace id as it should all be homed in the
     // top trace.
-    for (Span span : SetSpanReceiver.SetHolder.spans) {
+    for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
       Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
     }
   }
@@ -152,7 +160,8 @@ public class TestTracing {
     String[] expectedSpanNames = {
       "testReadTraceHooks",
       "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
-      
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations"
+      
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations",
+      "OpReadBlockProto"
     };
     assertSpanNamesFound(expectedSpanNames);
 
@@ -168,7 +177,7 @@ public class TestTracing {
 
     // There should only be one trace id as it should all be homed in the
     // top trace.
-    for (Span span : SetSpanReceiver.SetHolder.spans) {
+    for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
       Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
     }
   }
@@ -228,10 +237,24 @@ public class TestTracing {
     cluster.shutdown();
   }
 
-  private void assertSpanNamesFound(String[] expectedSpanNames) {
-    Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
-    for (String spanName : expectedSpanNames) {
-      Assert.assertTrue("Should find a span with name " + spanName, 
map.get(spanName) != null);
+  static void assertSpanNamesFound(final String[] expectedSpanNames) {
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
+          for (String spanName : expectedSpanNames) {
+            if (!map.containsKey(spanName)) {
+              return false;
+            }
+          }
+          return true;
+        }
+      }, 100, 1000);
+    } catch (TimeoutException e) {
+      Assert.fail("timed out to get expected spans: " + e.getMessage());
+    } catch (InterruptedException e) {
+      Assert.fail("interrupted while waiting spans: " + e.getMessage());
     }
   }
 
@@ -249,15 +272,16 @@ public class TestTracing {
     }
 
     public void receiveSpan(Span span) {
-      SetHolder.spans.add(span);
+      SetHolder.spans.put(span.getSpanId(), span);
     }
 
     public void close() {
     }
 
     public static class SetHolder {
-      public static Set<Span> spans = new HashSet<Span>();
-
+      public static ConcurrentHashMap<Long, Span> spans = 
+          new ConcurrentHashMap<Long, Span>();
+          
       public static int size() {
         return spans.size();
       }
@@ -265,7 +289,7 @@ public class TestTracing {
       public static Map<String, List<Span>> getMap() {
         Map<String, List<Span>> map = new HashMap<String, List<Span>>();
 
-        for (Span s : spans) {
+        for (Span s : spans.values()) {
           List<Span> l = map.get(s.getDescription());
           if (l == null) {
             l = new LinkedList<Span>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56119fec/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java
new file mode 100644
index 0000000..7fe8a1e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tracing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.htrace.Sampler;
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.io.IOException;
+
+public class TestTracingShortCircuitLocalRead {
+  private static Configuration conf;
+  private static MiniDFSCluster cluster;
+  private static DistributedFileSystem dfs;
+  private static SpanReceiverHost spanReceiverHost;
+  private static TemporarySocketDirectory sockDir;
+  static final Path TEST_PATH = new Path("testShortCircuitTraceHooks");
+  static final int TEST_LENGTH = 1234;
+
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
+
+  @Test
+  public void testShortCircuitTraceHooks() throws IOException {
+    conf = new Configuration();
+    conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
+        TestTracing.SetSpanReceiver.class.getName());
+    conf.setLong("dfs.blocksize", 100 * 1024);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 
false);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        "testShortCircuitTraceHooks._PORT");
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1)
+        .build();
+    dfs = cluster.getFileSystem();
+
+    try {
+      spanReceiverHost = SpanReceiverHost.getInstance(conf);
+      DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L);
+
+      TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", 
Sampler.ALWAYS);
+      FSDataInputStream stream = dfs.open(TEST_PATH);
+      byte buf[] = new byte[TEST_LENGTH];
+      IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
+      stream.close();
+      ts.close();
+
+      String[] expectedSpanNames = {
+        "OpRequestShortCircuitAccessProto",
+        "ShortCircuitShmRequestProto"
+      };
+      TestTracing.assertSpanNamesFound(expectedSpanNames);
+    } finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
+}

Reply via email to