DRILL-1078: Added metrics to unordered receiver (+ renaming) and merging receiver.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3e6ff2cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3e6ff2cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3e6ff2cc Branch: refs/heads/master Commit: 3e6ff2cc6e35c4a488f6c9d16a13256ef82a24d1 Parents: de0bd7d Author: Sudheesh Katkam <[email protected]> Authored: Thu Jun 26 09:37:58 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jun 26 18:31:24 2014 -0700 ---------------------------------------------------------------------- .../physical/config/MergingReceiverPOP.java | 6 ++++ .../exec/physical/config/UnorderedReceiver.java | 8 ++++- .../impl/mergereceiver/MergingRecordBatch.java | 4 +++ .../UnorderedReceiverBatch.java | 37 +++++++++++-------- .../drill/exec/record/RawFragmentBatch.java | 4 +++ .../apache/drill/exec/proto/UserBitShared.java | 38 ++++++++++---------- .../exec/proto/beans/CoreOperatorType.java | 4 +-- 7 files changed, 64 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java index 31d5945..d76ec80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.config; import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.AbstractReceiver; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -73,4 +74,9 @@ public class MergingReceiverPOP extends AbstractReceiver{ public int getOperatorType() { return CoreOperatorType.MERGING_RECEIVER_VALUE; } + + @JsonIgnore + public int getNumSenders() { + return senders.size(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java index a204752..357d62d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.config; import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.exec.physical.base.AbstractReceiver; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -59,6 +60,11 @@ public class UnorderedReceiver extends AbstractReceiver{ @Override public int getOperatorType() { - return CoreOperatorType.RANDOM_RECEIVER_VALUE; + return CoreOperatorType.UNORDERED_RECEIVER_VALUE; + } + + @JsonIgnore + public int getNumSenders() { + return senders.size(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index a16d64c..ace1539 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -100,6 +100,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private boolean done = false; public static enum Metric implements MetricDef{ + BYTES_RECEIVED, + NUM_SENDERS, NEXT_WAIT_NANOS; @Override @@ -116,6 +118,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> this.fragProviders = fragProviders; this.context = context; this.outgoingContainer = new VectorContainer(); + this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); } private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{ @@ -123,6 +126,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> try { RawFragmentBatch b = provider.getNext(); if(b != null){ + stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount()); stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); } return b; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 3d9e470..7424870 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -24,6 +24,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.UnorderedReceiver; @@ -50,6 +51,15 @@ public class UnorderedReceiverBatch implements RecordBatch { private OperatorStats stats; private boolean first = true; + public enum Metric implements MetricDef { + BYTES_RECEIVED, + NUM_SENDERS; + + @Override + public int metricId() { + return ordinal(); + } + } public UnorderedReceiverBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, UnorderedReceiver config) throws OutOfMemoryException { this.fragProvider = fragProvider; @@ -57,7 +67,9 @@ public class UnorderedReceiverBatch implements RecordBatch { // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader. this.batchLoader = new RecordBatchLoader(context.getAllocator()); - this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0), null); + + this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null); + this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); } @Override @@ -115,15 +127,7 @@ public class UnorderedReceiverBatch implements RecordBatch { batch = fragProvider.getNext(); // skip over empty batches. we do this since these are basically control messages. - while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first){ - if (first) { - first = false; - RecordBatchDef rbd = batch.getHeader().getDef(); - batchLoader.load(rbd, batch.getBody()); - batch.release(); - schema = batchLoader.getSchema().clone(); - batchLoader.clear(); - } + while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first) { batch = fragProvider.getNext(); } } finally { @@ -132,7 +136,7 @@ public class UnorderedReceiverBatch implements RecordBatch { first = false; - if (batch == null){ + if (batch == null) { batchLoader.clear(); return IterOutcome.NONE; } @@ -146,15 +150,18 @@ public class UnorderedReceiverBatch implements RecordBatch { RecordBatchDef rbd = batch.getHeader().getDef(); boolean schemaChanged = batchLoader.load(rbd, batch.getBody()); -// System.out.println(rbd.getRecordCount()); + stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount()); + batch.release(); - if(schemaChanged){ + if(schemaChanged) { this.schema = batchLoader.getSchema(); + stats.batchReceived(0, rbd.getRecordCount(), true); return IterOutcome.OK_NEW_SCHEMA; - }else{ + } else { + stats.batchReceived(0, rbd.getRecordCount(), false); return IterOutcome.OK; } - }catch(SchemaChangeException | IOException ex){ + } catch(SchemaChangeException | IOException ex) { context.fail(ex); return IterOutcome.STOP; } finally { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java index 39eae07..0cca9cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java @@ -70,4 +70,8 @@ public class RawFragmentBatch { sender.send(DataRpcConfig.OK); } + public long getByteCount() { + return body == null ? 0 : body.readableBytes(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 64cbcbf..d5b2352 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -375,9 +375,9 @@ public final class UserBitShared { */ PROJECT(10, 10), /** - * <code>RANDOM_RECEIVER = 11;</code> + * <code>UNORDERED_RECEIVER = 11;</code> */ - RANDOM_RECEIVER(11, 11), + UNORDERED_RECEIVER(11, 11), /** * <code>RANGE_SENDER = 12;</code> */ @@ -509,9 +509,9 @@ public final class UserBitShared { */ public static final int PROJECT_VALUE = 10; /** - * <code>RANDOM_RECEIVER = 11;</code> + * <code>UNORDERED_RECEIVER = 11;</code> */ - public static final int RANDOM_RECEIVER_VALUE = 11; + public static final int UNORDERED_RECEIVER_VALUE = 11; /** * <code>RANGE_SENDER = 12;</code> */ @@ -613,7 +613,7 @@ public final class UserBitShared { case 8: return MERGING_RECEIVER; case 9: return ORDERED_PARTITION_SENDER; case 10: return PROJECT; - case 11: return RANDOM_RECEIVER; + case 11: return UNORDERED_RECEIVER; case 12: return RANGE_SENDER; case 13: return SCREEN; case 14: return SELECTION_VECTOR_REMOVER; @@ -16539,25 +16539,25 @@ public final class UserBitShared { "\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen" + "tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI", "ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE" + - "LLED\020\004\022\n\n\006FAILED\020\005*\221\005\n\020CoreOperatorType\022" + + "LLED\020\004\022\n\n\006FAILED\020\005*\224\005\n\020CoreOperatorType\022" + "\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" + "\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" + "_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" + "N_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVE" + "R\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PRO" + - "JECT\020\n\022\023\n\017RANDOM_RECEIVER\020\013\022\020\n\014RANGE_SEN" + - "DER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR_RE" + - "MOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nTOP_", - "N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t" + - "\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_ROW_G" + - "ROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYSTEM" + - "_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQ" + - "UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" + - "T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" + - "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017CO" + - "MPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 B." + - "\n\033org.apache.drill.exec.protoB\rUserBitSh" + - "aredH\001" + "JECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\020\n\014RANGE_" + + "SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR" + + "_REMOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nT", + "OP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020" + + "\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_RO" + + "W_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYS" + + "TEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016P" + + "ARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013" + + "TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON" + + "_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n" + + "\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020" + + " B.\n\033org.apache.drill.exec.protoB\rUserBi" + + "tSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e6ff2cc/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index e8039c9..0c83e06 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -33,7 +33,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO MERGING_RECEIVER(8), ORDERED_PARTITION_SENDER(9), PROJECT(10), - RANDOM_RECEIVER(11), + UNORDERED_RECEIVER(11), RANGE_SENDER(12), SCREEN(13), SELECTION_VECTOR_REMOVER(14), @@ -83,7 +83,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 8: return MERGING_RECEIVER; case 9: return ORDERED_PARTITION_SENDER; case 10: return PROJECT; - case 11: return RANDOM_RECEIVER; + case 11: return UNORDERED_RECEIVER; case 12: return RANGE_SENDER; case 13: return SCREEN; case 14: return SELECTION_VECTOR_REMOVER;
