DRILL-1078: Added metrics to unordered receiver (+ renaming) and merging 
receiver.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3e6ff2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3e6ff2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3e6ff2cc

Branch: refs/heads/master
Commit: 3e6ff2cc6e35c4a488f6c9d16a13256ef82a24d1
Parents: de0bd7d
Author: Sudheesh Katkam <[email protected]>
Authored: Thu Jun 26 09:37:58 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu Jun 26 18:31:24 2014 -0700

----------------------------------------------------------------------
 .../physical/config/MergingReceiverPOP.java     |  6 ++++
 .../exec/physical/config/UnorderedReceiver.java |  8 ++++-
 .../impl/mergereceiver/MergingRecordBatch.java  |  4 +++
 .../UnorderedReceiverBatch.java                 | 37 +++++++++++--------
 .../drill/exec/record/RawFragmentBatch.java     |  4 +++
 .../apache/drill/exec/proto/UserBitShared.java  | 38 ++++++++++----------
 .../exec/proto/beans/CoreOperatorType.java      |  4 +--
 7 files changed, 64 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index 31d5945..d76ec80 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.config;
 
 import java.util.List;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.base.AbstractReceiver;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -73,4 +74,9 @@ public class MergingReceiverPOP extends AbstractReceiver{
   public int getOperatorType() {
     return CoreOperatorType.MERGING_RECEIVER_VALUE;
   }
+
+  @JsonIgnore
+  public int getNumSenders() {
+    return senders.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
index a204752..357d62d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.config;
 
 import java.util.List;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.exec.physical.base.AbstractReceiver;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -59,6 +60,11 @@ public class UnorderedReceiver extends AbstractReceiver{
 
   @Override
   public int getOperatorType() {
-    return CoreOperatorType.RANDOM_RECEIVER_VALUE;
+    return CoreOperatorType.UNORDERED_RECEIVER_VALUE;
+  }
+
+  @JsonIgnore
+  public int getNumSenders() {
+    return senders.size();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a16d64c..ace1539 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -100,6 +100,8 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   private boolean done = false;
 
   public static enum Metric implements MetricDef{
+    BYTES_RECEIVED,
+    NUM_SENDERS,
     NEXT_WAIT_NANOS;
 
     @Override
@@ -116,6 +118,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     this.fragProviders = fragProviders;
     this.context = context;
     this.outgoingContainer = new VectorContainer();
+    this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
   }
 
   private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws 
IOException{
@@ -123,6 +126,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     try {
       RawFragmentBatch b = provider.getNext();
       if(b != null){
+        stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
         stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
       }
       return b;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 3d9e470..7424870 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
@@ -50,6 +51,15 @@ public class UnorderedReceiverBatch implements RecordBatch {
   private OperatorStats stats;
   private boolean first = true;
 
+  public enum Metric implements MetricDef {
+    BYTES_RECEIVED,
+    NUM_SENDERS;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
 
   public UnorderedReceiverBatch(FragmentContext context, 
RawFragmentBatchProvider fragProvider, UnorderedReceiver config) throws 
OutOfMemoryException {
     this.fragProvider = fragProvider;
@@ -57,7 +67,9 @@ public class UnorderedReceiverBatch implements RecordBatch {
     // In normal case, batchLoader does not require an allocator. However, in 
case of splitAndTransfer of a value vector,
     // we may need an allocator for the new offset vector. Therefore, here we 
pass the context's allocator to batchLoader.
     this.batchLoader = new RecordBatchLoader(context.getAllocator());
-    this.stats = context.getStats().getOperatorStats(new 
OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0), null);
+
+    this.stats = context.getStats().getOperatorStats(new 
OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
+    this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
   }
 
   @Override
@@ -115,15 +127,7 @@ public class UnorderedReceiverBatch implements RecordBatch 
{
         batch = fragProvider.getNext();
 
         // skip over empty batches. we do this since these are basically 
control messages.
-        while(batch != null && !batch.getHeader().getIsOutOfMemory() && 
batch.getHeader().getDef().getRecordCount() == 0 && !first){
-          if (first) {
-            first = false;
-            RecordBatchDef rbd = batch.getHeader().getDef();
-            batchLoader.load(rbd, batch.getBody());
-            batch.release();
-            schema = batchLoader.getSchema().clone();
-            batchLoader.clear();
-          }
+        while (batch != null && !batch.getHeader().getIsOutOfMemory() && 
batch.getHeader().getDef().getRecordCount() == 0 && !first) {
           batch = fragProvider.getNext();
         }
       } finally {
@@ -132,7 +136,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
 
       first = false;
 
-      if (batch == null){
+      if (batch == null) {
         batchLoader.clear();
         return IterOutcome.NONE;
       }
@@ -146,15 +150,18 @@ public class UnorderedReceiverBatch implements 
RecordBatch {
 
       RecordBatchDef rbd = batch.getHeader().getDef();
       boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
-//      System.out.println(rbd.getRecordCount());
+      stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
+
       batch.release();
-      if(schemaChanged){
+      if(schemaChanged) {
         this.schema = batchLoader.getSchema();
+        stats.batchReceived(0, rbd.getRecordCount(), true);
         return IterOutcome.OK_NEW_SCHEMA;
-      }else{
+      } else {
+        stats.batchReceived(0, rbd.getRecordCount(), false);
         return IterOutcome.OK;
       }
-    }catch(SchemaChangeException | IOException ex){
+    } catch(SchemaChangeException | IOException ex) {
       context.fail(ex);
       return IterOutcome.STOP;
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index 39eae07..0cca9cd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -70,4 +70,8 @@ public class RawFragmentBatch {
     sender.send(DataRpcConfig.OK);
   }
 
+  public long getByteCount() {
+    return body == null ? 0 : body.readableBytes();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 64cbcbf..d5b2352 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -375,9 +375,9 @@ public final class UserBitShared {
      */
     PROJECT(10, 10),
     /**
-     * <code>RANDOM_RECEIVER = 11;</code>
+     * <code>UNORDERED_RECEIVER = 11;</code>
      */
-    RANDOM_RECEIVER(11, 11),
+    UNORDERED_RECEIVER(11, 11),
     /**
      * <code>RANGE_SENDER = 12;</code>
      */
@@ -509,9 +509,9 @@ public final class UserBitShared {
      */
     public static final int PROJECT_VALUE = 10;
     /**
-     * <code>RANDOM_RECEIVER = 11;</code>
+     * <code>UNORDERED_RECEIVER = 11;</code>
      */
-    public static final int RANDOM_RECEIVER_VALUE = 11;
+    public static final int UNORDERED_RECEIVER_VALUE = 11;
     /**
      * <code>RANGE_SENDER = 12;</code>
      */
@@ -613,7 +613,7 @@ public final class UserBitShared {
         case 8: return MERGING_RECEIVER;
         case 9: return ORDERED_PARTITION_SENDER;
         case 10: return PROJECT;
-        case 11: return RANDOM_RECEIVER;
+        case 11: return UNORDERED_RECEIVER;
         case 12: return RANGE_SENDER;
         case 13: return SCREEN;
         case 14: return SELECTION_VECTOR_REMOVER;
@@ -16539,25 +16539,25 @@ public final class UserBitShared {
       
"\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen"
 +
       "tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI",
       
"ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE"
 +
-      
"LLED\020\004\022\n\n\006FAILED\020\005*\221\005\n\020CoreOperatorType\022" +
+      
"LLED\020\004\022\n\n\006FAILED\020\005*\224\005\n\020CoreOperatorType\022" +
       "\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" +
       
"\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" +
       "_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" 
+
       "N_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVE" 
+
       "R\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PRO" 
+
-      "JECT\020\n\022\023\n\017RANDOM_RECEIVER\020\013\022\020\n\014RANGE_SEN" 
+
-      "DER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR_RE" +
-      "MOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nTOP_",
-      
"N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t"
 +
-      
"\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_ROW_G" +
-      "ROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYSTEM" 
+
-      "_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQ" 
+
-      
"UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" +
-      "T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" +
-      
"B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017CO" +
-      "MPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 B." +
-      "\n\033org.apache.drill.exec.protoB\rUserBitSh" +
-      "aredH\001"
+      "JECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\020\n\014RANGE_" 
+
+      "SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR" +
+      "_REMOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nT",
+      
"OP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020" +
+      
"\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_RO"
 +
+      "W_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYS" 
+
+      "TEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016P" 
+
+      
"ARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013" +
+      "TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON" +
+      "_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n" +
+      "\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020" +
+      " B.\n\033org.apache.drill.exec.protoB\rUserBi" +
+      "tSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index e8039c9..0c83e06 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -33,7 +33,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     MERGING_RECEIVER(8),
     ORDERED_PARTITION_SENDER(9),
     PROJECT(10),
-    RANDOM_RECEIVER(11),
+    UNORDERED_RECEIVER(11),
     RANGE_SENDER(12),
     SCREEN(13),
     SELECTION_VECTOR_REMOVER(14),
@@ -83,7 +83,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 8: return MERGING_RECEIVER;
             case 9: return ORDERED_PARTITION_SENDER;
             case 10: return PROJECT;
-            case 11: return RANDOM_RECEIVER;
+            case 11: return UNORDERED_RECEIVER;
             case 12: return RANGE_SENDER;
             case 13: return SCREEN;
             case 14: return SELECTION_VECTOR_REMOVER;

Reply via email to