This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new cbcb59d  DRILL-6478: enhance debug logs for batch sizing
cbcb59d is described below

commit cbcb59df5ccc655e374ac6244c572088153aedc6
Author: Padma Penumarthy <ppenuma...@yahoo.com>
AuthorDate: Thu Jun 7 14:04:47 2018 -0700

    DRILL-6478: enhance debug logs for batch sizing
    
    closes #1310
---
 .../physical/impl/flatten/FlattenRecordBatch.java  | 23 +++++------
 .../exec/physical/impl/join/HashJoinBatch.java     | 37 +++++++++---------
 .../exec/physical/impl/join/MergeJoinBatch.java    | 44 +++++++++++-----------
 .../physical/impl/union/UnionAllRecordBatch.java   | 25 ++++++++++++
 4 files changed, 77 insertions(+), 52 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index be44c94..2f92d52 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -157,9 +157,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
 
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
-      }
+      logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
 
       updateIncomingStats();
     }
@@ -171,6 +169,8 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     // get the output batch size from config.
     int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);
+
+    logger.debug("BATCH_STATS, configured output batch size: {}", 
configuredBatchSize);
   }
 
   @Override
@@ -263,7 +263,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     flattenMemoryManager.updateOutgoingStats(outputRecords);
 
     if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
     }
 
     // Get the final outcome based on hasRemainder since that will determine 
if all the incoming records were
@@ -516,14 +516,15 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
flattenMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, 
flattenMemoryManager.getTotalOutputRecords());
 
-    logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, 
 avg row bytes : {}, record count : {}",
-      flattenMemoryManager.getNumIncomingBatches(), 
flattenMemoryManager.getAvgInputBatchSize(),
-      flattenMemoryManager.getAvgInputRowWidth(), 
flattenMemoryManager.getTotalInputRecords());
-
-    logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, 
 avg row bytes : {}, record count : {}",
-      flattenMemoryManager.getNumOutgoingBatches(), 
flattenMemoryManager.getAvgOutputBatchSize(),
-      flattenMemoryManager.getAvgOutputRowWidth(), 
flattenMemoryManager.getTotalOutputRecords());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : 
{},  avg row bytes : {}, record count : {}",
+        flattenMemoryManager.getNumIncomingBatches(), 
flattenMemoryManager.getAvgInputBatchSize(),
+        flattenMemoryManager.getAvgInputRowWidth(), 
flattenMemoryManager.getTotalInputRecords());
 
+      logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : 
{},  avg row bytes : {}, record count : {}",
+        flattenMemoryManager.getNumOutgoingBatches(), 
flattenMemoryManager.getAvgOutputBatchSize(),
+        flattenMemoryManager.getAvgOutputRowWidth(), 
flattenMemoryManager.getTotalOutputRecords());
+    }
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 4267077..428a47e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -242,10 +242,8 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     batchMemoryManager.update(LEFT_INDEX, 0);
     batchMemoryManager.update(RIGHT_INDEX, 0, true);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming left:\n {}", 
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
-      logger.debug("BATCH_STATS, incoming right:\n {}", 
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
-    }
+    logger.debug("BATCH_STATS, incoming left: {}", 
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+    logger.debug("BATCH_STATS, incoming right: {}", 
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
 
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) 
{
       state = BatchState.STOP;
@@ -358,7 +356,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
         batchMemoryManager.updateOutgoingStats(outputRecords);
         if (logger.isDebugEnabled()) {
-          logger.debug("BATCH_STATS, outgoing:\n {}", new 
RecordBatchSizer(this));
+          logger.debug("BATCH_STATS, outgoing: {}", new 
RecordBatchSizer(this));
         }
 
         /* We are here because of one the following
@@ -890,6 +888,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     // get the output batch size from config.
     int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, 
right);
+    logger.debug("BATCH_STATS, configured output batch size: {}", 
configuredBatchSize);
   }
 
   /**
@@ -1004,21 +1003,19 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
     updateMetrics();
 
-    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, 
avg bytes : {},  avg row bytes : {}, record count : {}",
+        
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), 
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), 
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, 
avg bytes : {},  avg row bytes : {}, record count : {}",
+        
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), 
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), 
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+    }
 
     this.cleanup();
     super.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index a5c2ae7..62967a9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -122,9 +122,7 @@ public class MergeJoinBatch extends 
AbstractBinaryRecordBatch<MergeJoinPOP> {
     @Override
     public void update(int inputIndex) {
       status.setTargetOutputRowCount(super.update(inputIndex, 
status.getOutPosition()));
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? 
"left" : "right", getRecordBatchSizer(inputIndex));
-      }
+      logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : 
"right", getRecordBatchSizer(inputIndex));
     }
   }
 
@@ -132,8 +130,10 @@ public class MergeJoinBatch extends 
AbstractBinaryRecordBatch<MergeJoinPOP> {
     super(popConfig, context, true, left, right);
 
     // Instantiate the batch memory manager
-    final int outputBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, 
right);
+    final int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, 
right);
+
+    logger.debug("BATCH_STATS, configured output batch size: {}", 
configuredBatchSize);
 
     if (popConfig.getConditions().size() == 0) {
       throw new UnsupportedOperationException("Merge Join currently does not 
support cartesian join.  This join operator was configured with 0 conditions");
@@ -271,7 +271,7 @@ public class MergeJoinBatch extends 
AbstractBinaryRecordBatch<MergeJoinPOP> {
     }
 
     if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
     }
 
     batchMemoryManager.updateOutgoingStats(getRecordCount());
@@ -281,21 +281,23 @@ public class MergeJoinBatch extends 
AbstractBinaryRecordBatch<MergeJoinPOP> {
   public void close() {
     updateBatchMemoryManagerStats();
 
-    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, 
avg bytes : {},  avg row bytes : {}, record count : {}",
+        
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+        
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+        
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, 
avg bytes : {},  avg row bytes : {}, record count : {}",
+        
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+        
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+        
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+    }
 
     super.close();
     leftIterator.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index f4c1900..36b9c9b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -39,9 +39,11 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
@@ -74,6 +76,7 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
     // get the output batch size from config.
     int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new RecordBatchMemoryManager(numInputs, 
configuredBatchSize);
+    logger.debug("BATCH_STATS, configured output batch size: {}", 
configuredBatchSize);
   }
 
   @Override
@@ -168,6 +171,10 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
     batchStatus.recordsProcessed += recordCount;
     batchMemoryManager.updateOutgoingStats(recordCount);
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    }
+
     if (callBack.getSchemaChangedAndReset()) {
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
@@ -361,6 +368,8 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
         if (topStatus.prefetched) {
           topStatus.prefetched = false;
           batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+          logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 
0 ? "left" : "right",
+            batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
           return Pair.of(topStatus.outcome, topStatus);
         } else {
 
@@ -378,6 +387,8 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
             topStatus.recordsProcessed = 0;
             topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
             batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+            logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex 
== 0 ? "left" : "right",
+              batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
             return Pair.of(outcome, topStatus);
           case OUT_OF_MEMORY:
           case STOP:
@@ -409,6 +420,20 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
   public void close() {
     super.close();
     updateBatchMemoryManagerStats();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, 
avg bytes : {},  avg row bytes : {}, record count : {}",
+        
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), 
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), 
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, 
avg bytes : {},  avg row bytes : {}, record count : {}",
+        
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), 
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), 
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+    }
   }
 
 }

-- 
To stop receiving notification emails like this one, please contact
timothyfar...@apache.org.

Reply via email to