Add BaseRootExec which will enable collection of stats for Senders
Add SenderStats that collects stats specific to senders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9b22d2c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9b22d2c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9b22d2c3

Branch: refs/heads/master
Commit: 9b22d2c37d638451149270a19fe1b2e63d8ea670
Parents: 3e98ffc
Author: Mehant Baid <[email protected]>
Authored: Fri May 23 10:29:48 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sun Jun 8 19:13:06 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentStats.java    |  4 ++
 .../org/apache/drill/exec/ops/OpProfileDef.java |  5 ++
 .../apache/drill/exec/ops/OperatorContext.java  | 13 ++--
 .../apache/drill/exec/ops/OperatorStats.java    | 32 ++++++---
 .../org/apache/drill/exec/ops/SenderStats.java  | 73 ++++++++++++++++++++
 .../drill/exec/physical/impl/BaseRootExec.java  | 21 +++---
 .../PartitionSenderRootExec.java                | 12 +++-
 .../partitionsender/PartitionSenderStats.java   |  5 +-
 .../partitionsender/PartitionStatsBatch.java    | 24 +++++++
 .../impl/partitionsender/Partitioner.java       |  3 +
 .../partitionsender/PartitionerTemplate.java    | 15 +++-
 11 files changed, 175 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index d667794..19ac0aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -54,4 +54,8 @@ public class FragmentStats {
     return stats;
   }
 
+  public void addOperatorStats(OperatorStats stats) {
+    operators.add(stats);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
index fb68e4a..b5c8d86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
@@ -23,6 +23,11 @@ public class OpProfileDef {
   public int operatorType;
   public int incomingCount;
 
+  public OpProfileDef(int operatorId, int operatorType, int incomingCount) {
+    this.operatorId = operatorId;
+    this.operatorType = operatorType;
+    this.incomingCount = incomingCount;
+  }
   public int getOperatorId(){
     return operatorId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 116b616..d62ea2f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -36,14 +36,17 @@ public class OperatorContext implements Closeable {
     this.allocator = 
context.getNewChildAllocator(popConfig.getInitialAllocation(), 
popConfig.getMaxAllocation());
     this.popConfig = popConfig;
 
-    OpProfileDef def = new OpProfileDef();
-    def.operatorId = popConfig.getOperatorId();
-    def.incomingCount = getChildCount(popConfig);
-    def.operatorType = popConfig.getOperatorType();
+    OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), 
popConfig.getOperatorType(), getChildCount(popConfig));
     this.stats = context.getStats().getOperatorStats(def);
   }
 
-  private static int getChildCount(PhysicalOperator popConfig){
+  public OperatorContext(PhysicalOperator popConfig, FragmentContext context, 
OperatorStats stats) throws OutOfMemoryException {
+    this.allocator = 
context.getNewChildAllocator(popConfig.getInitialAllocation(), 
popConfig.getMaxAllocation());
+    this.popConfig = popConfig;
+    this.stats     = stats;
+  }
+
+  public static int getChildCount(PhysicalOperator popConfig){
     Iterator<PhysicalOperator> iter = popConfig.iterator();
     int i = 0;
     while(iter.hasNext()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index cde1876..4ac8f74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -28,8 +28,8 @@ import com.carrotsearch.hppc.IntLongOpenHashMap;
 public class OperatorStats {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OperatorStats.class);
 
-  private final int operatorId;
-  private final int operatorType;
+  protected final int operatorId;
+  protected final int operatorType;
 
   private IntLongOpenHashMap longMetrics = new IntLongOpenHashMap();
   private IntDoubleOpenHashMap doubleMetrics = new IntDoubleOpenHashMap();
@@ -42,8 +42,8 @@ public class OperatorStats {
   private boolean inProcessing = false;
   private boolean inSetup = false;
 
-  private long processingNanos;
-  private long setupNanos;
+  protected long processingNanos;
+  protected long setupNanos;
 
   private long processingMark;
   private long setupMark;
@@ -105,23 +105,37 @@ public class OperatorStats {
         .setSetupNanos(setupNanos) //
         .setProcessNanos(processingNanos);
 
+    addAllMetrics(b);
+
+    return b.build();
+  }
+
+  public void addAllMetrics(OperatorProfile.Builder builder) {
+    addStreamProfile(builder);
+    addLongMetrics(builder);
+    addDoubleMetrics(builder);
+  }
+
+  public void addStreamProfile(OperatorProfile.Builder builder) {
     for(int i = 0; i < recordsReceivedByInput.length; i++){
-      
b.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i]));
+      
builder.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i]));
     }
+  }
 
+  public void addLongMetrics(OperatorProfile.Builder builder) {
     for(int i =0; i < longMetrics.allocated.length; i++){
       if(longMetrics.allocated[i]){
-        
b.addMetric(MetricValue.newBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i]));
+        
builder.addMetric(MetricValue.newBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i]));
       }
     }
+  }
 
+  public void addDoubleMetrics(OperatorProfile.Builder builder) {
     for(int i =0; i < doubleMetrics.allocated.length; i++){
       if(doubleMetrics.allocated[i]){
-        
b.addMetric(MetricValue.newBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i]));
+        
builder.addMetric(MetricValue.newBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i]));
       }
     }
-
-    return b.build();
   }
 
   public void addLongStat(MetricDef metric, long value){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
new file mode 100644
index 0000000..c766632
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
+import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderStats;
+import org.apache.drill.exec.proto.UserBitShared;
+
+import java.util.List;
+
+public class SenderStats extends OperatorStats {
+
+  long minReceiverRecordCount = 0;
+  long maxReceiverRecordCount = 0;
+  int nSenders = 0;
+
+  public SenderStats(PhysicalOperator operator) {
+    super(new OpProfileDef(operator.getOperatorId(), 
operator.getOperatorType(), OperatorContext.getChildCount(operator)));
+  }
+
+  public void updatePartitionStats(List<? extends PartitionStatsBatch> 
outgoing) {
+
+    for (PartitionStatsBatch o : outgoing) {
+      long totalRecords = o.getTotalRecords();
+
+      minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
+      maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
+    }
+    nSenders = outgoing.size();
+  }
+
+  @Override
+  public UserBitShared.OperatorProfile getProfile() {
+    final UserBitShared.OperatorProfile.Builder b = 
UserBitShared.OperatorProfile //
+        .newBuilder() //
+        .setOperatorType(operatorType) //
+        .setOperatorId(operatorId) //
+        .setSetupNanos(setupNanos) //
+        .setProcessNanos(processingNanos);
+
+    addAllMetrics(b);
+
+    return b.build();
+
+  }
+
+  public void addAllMetrics(UserBitShared.OperatorProfile.Builder b) {
+    super.addAllMetrics(b);
+
+    
b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(minReceiverRecordCount).
+        setMetricId(PartitionSenderStats.MIN_RECORDS.metricId()));
+    
b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(maxReceiverRecordCount).
+        setMetricId(PartitionSenderStats.MAX_RECORDS.metricId()));
+    b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(nSenders)
+        .setMetricId(PartitionSenderStats.N_SENDERS.metricId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 0db8c07..256c106 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,25 +17,16 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.record.RecordBatch;
 
-public abstract class BaseRootExec<T extends PhysicalOperator> implements 
RootExec {
+public abstract class BaseRootExec implements RootExec {
 
-  protected final OperatorStats stats;
-  protected final OperatorContext oContext;
-
-  public BaseRootExec(FragmentContext context, T operator) throws 
OutOfMemoryException {
-    oContext = new OperatorContext(operator, context);
-    stats = oContext.getStats();
-  }
+  protected OperatorStats stats = null;
 
   @Override
   public final boolean next() {
+    // Stats should have been initialized
+    assert stats != null;
     try {
       stats.startProcessing();
       return innerNext();
@@ -44,5 +35,9 @@ public abstract class BaseRootExec<T extends 
PhysicalOperator> implements RootEx
     }
   }
 
+  public void setStats(OperatorStats stats) {
+    this.stats = stats;
+  }
+
   public abstract boolean innerNext();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 5476a50..9be45d2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -32,11 +32,13 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.SenderStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionerTemplate.OutgoingRecordBatch;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.*;
@@ -48,7 +50,6 @@ import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JType;
 import org.apache.drill.exec.vector.CopyUtil;
 
-
 public class PartitionSenderRootExec extends BaseRootExec {
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
@@ -56,24 +57,28 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private HashPartitionSender operator;
   private Partitioner partitioner;
   private FragmentContext context;
+  private OperatorContext oContext;
   private boolean ok = true;
   private final SendingAccountor sendCount = new SendingAccountor();
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final StatusHandler statusHandler;
-
+  private final SenderStats stats;
 
   public PartitionSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws 
OutOfMemoryException {
 
-    super(context, operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;
     this.outGoingBatchCount = operator.getDestinations().size();
     this.popConfig = operator;
     this.statusHandler = new StatusHandler(sendCount, context);
+    this.stats = new SenderStats(operator);
+    context.getStats().addOperatorStats(this.stats);
+    setStats(stats);
+    this.oContext = new OperatorContext(operator, context, stats);
   }
 
   @Override
@@ -140,6 +145,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
           context.fail(e);
           return false;
         }
+        stats.updatePartitionStats(partitioner.getOutgoingBatches());
         for (VectorWrapper v : incoming) {
           v.clear();
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
index 4790596..de5967f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
@@ -22,7 +22,10 @@ import org.apache.drill.exec.ops.MetricDef;
 public enum PartitionSenderStats implements MetricDef {
 
   BATCHES_SENT,
-  RECORDS_SENT;
+  RECORDS_SENT,
+  MIN_RECORDS,
+  MAX_RECORDS,
+  N_SENDERS;
 
   @Override
   public int metricId() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
new file mode 100644
index 0000000..85ccffb
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+
+public interface PartitionStatsBatch {
+
+  public long getTotalRecords();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 6958403..53528ba 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -25,8 +25,10 @@ import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
 
 import java.io.IOException;
+import java.util.List;
 
 public interface Partitioner {
 
@@ -42,6 +44,7 @@ public interface Partitioner {
   public abstract void flushOutgoingBatches(boolean isLastBatch, boolean 
schemaChanged) throws IOException;
   public abstract void initialize();
   public abstract void clear();
+  public abstract List<? extends PartitionStatsBatch> getOutgoingBatches();
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 510327a..1e6e71b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -66,6 +66,11 @@ public abstract class PartitionerTemplate implements 
Partitioner {
   public PartitionerTemplate() throws SchemaChangeException {
   }
 
+  @Override
+  public List<? extends PartitionStatsBatch> getOutgoingBatches() {
+    return outgoingBatches;
+  }
+
   public final void setup(FragmentContext context,
                           RecordBatch incoming,
                           HashPartitionSender popConfig,
@@ -189,7 +194,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
   public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") 
OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
   public abstract int doEval(@Named("inIndex") int inIndex);
 
-  public class OutgoingRecordBatch implements VectorAccessible {
+  public class OutgoingRecordBatch implements PartitionStatsBatch, 
VectorAccessible {
 
     private final DataTunnel tunnel;
     private final HashPartitionSender operator;
@@ -202,6 +207,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     private boolean isLast = false;
     private BatchSchema outSchema;
     private int recordCount;
+    private int totalRecords;
     private OperatorStats stats;
     private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
     private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
@@ -224,6 +230,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     protected boolean copy(int inIndex) throws IOException {
       if (doEval(inIndex, recordCount)) {
         recordCount++;
+        totalRecords++;
         if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
           flush();
         }
@@ -330,6 +337,12 @@ public abstract class PartitionerTemplate implements 
Partitioner {
       return recordCount;
     }
 
+
+    @Override
+    public long getTotalRecords() {
+      return totalRecords;
+    }
+
     @Override
     public TypedFieldId getValueVectorId(SchemaPath path) {
       return vectorContainer.getValueVectorId(path);

Reply via email to