This is an automated email from the ASF dual-hosted git repository. timothyfarkas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new cbcb59d DRILL-6478: enhance debug logs for batch sizing cbcb59d is described below commit cbcb59df5ccc655e374ac6244c572088153aedc6 Author: Padma Penumarthy <ppenuma...@yahoo.com> AuthorDate: Thu Jun 7 14:04:47 2018 -0700 DRILL-6478: enhance debug logs for batch sizing closes #1310 --- .../physical/impl/flatten/FlattenRecordBatch.java | 23 +++++------ .../exec/physical/impl/join/HashJoinBatch.java | 37 +++++++++--------- .../exec/physical/impl/join/MergeJoinBatch.java | 44 +++++++++++----------- .../physical/impl/union/UnionAllRecordBatch.java | 25 ++++++++++++ 4 files changed, 77 insertions(+), 52 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index be44c94..2f92d52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -157,9 +157,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // i.e. all rows fit within memory budget. setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer()); - } + logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer()); updateIncomingStats(); } @@ -171,6 +169,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // get the output batch size from config. int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize); + + logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); } @Override @@ -263,7 +263,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { flattenMemoryManager.updateOutgoingStats(outputRecords); if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); + logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); } // Get the final outcome based on hasRemainder since that will determine if all the incoming records were @@ -516,14 +516,15 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords()); - logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(), - flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords()); - - logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(), - flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords()); + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(), + flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords()); + logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(), + flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords()); + } } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 4267077..428a47e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -242,10 +242,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { batchMemoryManager.update(LEFT_INDEX, 0); batchMemoryManager.update(RIGHT_INDEX, 0, true); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming left:\n {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); - logger.debug("BATCH_STATS, incoming right:\n {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); - } + logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); + logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) { state = BatchState.STOP; @@ -358,7 +356,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { batchMemoryManager.updateOutgoingStats(outputRecords); if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); + logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); } /* We are here because of one the following @@ -890,6 +888,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // get the output batch size from config. int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right); + logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); } /** @@ -1004,21 +1003,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { updateMetrics(); - logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - - logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), - batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + } this.cleanup(); super.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index a5c2ae7..62967a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -122,9 +122,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { @Override public void update(int inputIndex) { status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex)); - } + logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex)); } } @@ -132,8 +130,10 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { super(popConfig, context, true, left, right); // Instantiate the batch memory manager - final int outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); - batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, right); + final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right); + + logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); if (popConfig.getConditions().size() == 0) { throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); @@ -271,7 +271,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { } if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); + logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); } batchMemoryManager.updateOutgoingStats(getRecordCount()); @@ -281,21 +281,23 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { public void close() { updateBatchMemoryManagerStats(); - logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - - logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), - batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + } super.close(); leftIterator.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index f4c1900..36b9c9b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -39,9 +39,11 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessibleUtilities; @@ -74,6 +76,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { // get the output batch size from config. int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize); + logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); } @Override @@ -168,6 +171,10 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { batchStatus.recordsProcessed += recordCount; batchMemoryManager.updateOutgoingStats(recordCount); + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); + } + if (callBack.getSchemaChangedAndReset()) { return IterOutcome.OK_NEW_SCHEMA; } else { @@ -361,6 +368,8 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { if (topStatus.prefetched) { topStatus.prefetched = false; batchMemoryManager.update(topStatus.batch, topStatus.inputIndex); + logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right", + batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex)); return Pair.of(topStatus.outcome, topStatus); } else { @@ -378,6 +387,8 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { topStatus.recordsProcessed = 0; topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount(); batchMemoryManager.update(topStatus.batch, topStatus.inputIndex); + logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right", + batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex)); return Pair.of(outcome, topStatus); case OUT_OF_MEMORY: case STOP: @@ -409,6 +420,20 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { public void close() { super.close(); updateBatchMemoryManagerStats(); + + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + } } } -- To stop receiving notification emails like this one, please contact timothyfar...@apache.org.