DRILL-6296: Add operator metrics for batch sizing for merge join closes #1181
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/da241134 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/da241134 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/da241134 Branch: refs/heads/master Commit: da241134fb88464139437b05b1feaafbb3014bb0 Parents: 77f5e90 Author: Padma Penumarthy <ppenuma...@yahoo.com> Authored: Thu Mar 29 19:05:07 2018 -0700 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Fri Apr 6 12:07:30 2018 +0300 ---------------------------------------------------------------------- .../drill/exec/ops/OperatorMetricRegistry.java | 2 + .../impl/flatten/FlattenRecordBatch.java | 28 ++- .../exec/physical/impl/join/MergeJoinBatch.java | 90 +++++++- .../AbstractRecordBatchMemoryManager.java | 134 ----------- .../exec/record/RecordBatchMemoryManager.java | 228 +++++++++++++++++++ .../drill/exec/record/RecordIterator.java | 6 +- 6 files changed, 330 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java index b029154..0b9aeb6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate; import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec; import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch; import org.apache.drill.exec.physical.impl.join.HashJoinBatch; +import org.apache.drill.exec.physical.impl.join.MergeJoinBatch; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; @@ -51,6 +52,7 @@ public class OperatorMetricRegistry { register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class); register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class); register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class); + register(CoreOperatorType.MERGE_JOIN_VALUE, MergeJoinBatch.Metric.class); } private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) { http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index a1f783f..aea415b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -45,7 +45,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; @@ -104,11 +104,11 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { INPUT_BATCH_COUNT, AVG_INPUT_BATCH_BYTES, AVG_INPUT_ROW_BYTES, - TOTAL_INPUT_RECORDS, + INPUT_RECORD_COUNT, OUTPUT_BATCH_COUNT, AVG_OUTPUT_BATCH_BYTES, AVG_OUTPUT_ROW_BYTES, - TOTAL_OUTPUT_RECORDS; + OUTPUT_RECORD_COUNT; @Override public int metricId() { @@ -116,7 +116,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } } - private class FlattenMemoryManager extends AbstractRecordBatchMemoryManager { + private class FlattenMemoryManager extends RecordBatchMemoryManager { @Override public void update() { @@ -152,9 +152,10 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // i.e. all rows fit within memory budget. setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); - logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," + - "avgOutgoingRowWidth : {}, outputRowCount : {}", getRecordBatchSizer(), outputBatchSize, - avgOutgoingRowWidth, getOutputRowCount()); + logger.debug("incoming batch size : {}", getRecordBatchSizer()); + + logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", + outputBatchSize, avgOutgoingRowWidth, getOutputRowCount()); updateIncomingStats(); } @@ -496,11 +497,20 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { stats.setLongStat(Metric.INPUT_BATCH_COUNT, flattenMemoryManager.getNumIncomingBatches()); stats.setLongStat(Metric.AVG_INPUT_BATCH_BYTES, flattenMemoryManager.getAvgInputBatchSize()); stats.setLongStat(Metric.AVG_INPUT_ROW_BYTES, flattenMemoryManager.getAvgInputRowWidth()); - stats.setLongStat(Metric.TOTAL_INPUT_RECORDS, flattenMemoryManager.getTotalInputRecords()); + stats.setLongStat(Metric.INPUT_RECORD_COUNT, flattenMemoryManager.getTotalInputRecords()); stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, flattenMemoryManager.getNumOutgoingBatches()); stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, flattenMemoryManager.getAvgOutputBatchSize()); stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth()); - stats.setLongStat(Metric.TOTAL_OUTPUT_RECORDS, flattenMemoryManager.getTotalOutputRecords()); + stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords()); + + logger.debug("input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(), + flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords()); + + logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(), + flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords()); + } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 2155f0a..ab50b22 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -44,6 +44,7 @@ import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.record.RecordBatchSizer; @@ -56,7 +57,7 @@ import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.AbstractRecordBatch; -import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchMemoryManager; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -109,12 +110,37 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private static final String LEFT_INPUT = "LEFT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT"; - private class MergeJoinMemoryManager extends AbstractRecordBatchMemoryManager { + private static final int numInputs = 2; + private static final int LEFT_INDEX = 0; + private static final int RIGHT_INDEX = 1; + + public enum Metric implements MetricDef { + LEFT_INPUT_BATCH_COUNT, + LEFT_AVG_INPUT_BATCH_BYTES, + LEFT_AVG_INPUT_ROW_BYTES, + LEFT_INPUT_RECORD_COUNT, + RIGHT_INPUT_BATCH_COUNT, + RIGHT_AVG_INPUT_BATCH_BYTES, + RIGHT_AVG_INPUT_ROW_BYTES, + RIGHT_INPUT_RECORD_COUNT, + OUTPUT_BATCH_COUNT, + AVG_OUTPUT_BATCH_BYTES, + AVG_OUTPUT_ROW_BYTES, + OUTPUT_RECORD_COUNT; + + @Override + public int metricId() { + return ordinal(); + } + } + + private class MergeJoinMemoryManager extends RecordBatchMemoryManager { private int leftRowWidth; private int rightRowWidth; - private RecordBatchSizer leftSizer; - private RecordBatchSizer rightSizer; + public MergeJoinMemoryManager() { + super(numInputs); + } /** * mergejoin operates on one record at a time from the left and right batches @@ -127,17 +153,20 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { @Override public void update(int inputIndex) { switch(inputIndex) { - case 0: - leftSizer = new RecordBatchSizer(left); - leftRowWidth = leftSizer.netRowWidth(); + case LEFT_INDEX: + setRecordBatchSizer(inputIndex, new RecordBatchSizer(left)); + leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth(); + logger.debug("left incoming batch size : {}", getRecordBatchSizer(inputIndex)); break; - case 1: - rightSizer = new RecordBatchSizer(right); - rightRowWidth = rightSizer.netRowWidth(); + case RIGHT_INDEX: + setRecordBatchSizer(inputIndex, new RecordBatchSizer(right)); + rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth(); + logger.debug("right incoming batch size : {}", getRecordBatchSizer(inputIndex)); default: break; } + updateIncomingStats(inputIndex); final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; // If outgoing row width is 0, just return. This is possible for empty batches or @@ -153,16 +182,22 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { // calculate memory used so far based on previous outgoing row width and how many rows we already processed. final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth(); // This is the remaining memory. - final long remainingMemory = Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0); + final long remainingMemory = Math.max(outputBatchSize - memoryUsed, 0); // These are number of rows we can fit in remaining memory based on new outgoing row width. final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); - status.setTargetOutputRowCount(status.getOutPosition() + numOutputRowsRemaining); + status.setTargetOutputRowCount(adjustOutputRowCount(status.getOutPosition() + numOutputRowsRemaining)); setOutgoingRowWidth(newOutgoingRowWidth); + + logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", + outputBatchSize, getOutgoingRowWidth(), getOutputRowCount()); } @Override public RecordBatchSizer.ColumnSize getColumnSize(String name) { + RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX); + RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX); + if (leftSizer != null && leftSizer.getColumn(name) != null) { return leftSizer.getColumn(name); } @@ -324,10 +359,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { Preconditions.checkArgument(!vw.isHyper()); vw.getValueVector().getMutator().setValueCount(getRecordCount()); } + mergeJoinMemoryManager.updateOutgoingStats(getRecordCount()); } @Override public void close() { + updateStats(); super.close(); leftIterator.close(); rightIterator.close(); @@ -573,4 +610,33 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } return materializedExpr; } + + private void updateStats() { + stats.setLongStat(MergeJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX)); + stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); + stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX)); + stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX)); + + stats.setLongStat(MergeJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX)); + stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX)); + stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX)); + stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX)); + + stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumOutgoingBatches()); + stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgOutputBatchSize()); + stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgOutputRowWidth()); + stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalOutputRecords()); + + logger.debug("left input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX), mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX), + mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX), mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX)); + + logger.debug("right input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX), mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX), + mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX), mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX)); + + logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + mergeJoinMemoryManager.getNumOutgoingBatches(), mergeJoinMemoryManager.getAvgOutputBatchSize(), + mergeJoinMemoryManager.getAvgOutputRowWidth(), mergeJoinMemoryManager.getTotalOutputRecords()); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java deleted file mode 100644 index 67c9cee..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.record; - -import org.apache.drill.exec.vector.ValueVector; - -public abstract class AbstractRecordBatchMemoryManager { - protected static final int OFFSET_VECTOR_WIDTH = 4; - protected static final int WORST_CASE_FRAGMENTATION_FACTOR = 2; - protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; - protected static final int MIN_NUM_ROWS = 1; - private int outputRowCount = MAX_NUM_ROWS; - private int outgoingRowWidth; - private RecordBatchSizer sizer; - - /** - * operator metric stats - */ - private long numIncomingBatches; - private long sumInputBatchSizes; - private long totalInputRecords; - private long numOutgoingBatches; - private long sumOutputBatchSizes; - private long totalOutputRecords; - - public long getNumIncomingBatches() { - return numIncomingBatches; - } - - public long getTotalInputRecords() { - return totalInputRecords; - } - - public long getNumOutgoingBatches() { - return numOutgoingBatches; - } - - public long getTotalOutputRecords() { - return totalOutputRecords; - } - - public long getAvgInputBatchSize() { - return RecordBatchSizer.safeDivide(sumInputBatchSizes, numIncomingBatches); - } - - public long getAvgInputRowWidth() { - return RecordBatchSizer.safeDivide(sumInputBatchSizes, totalInputRecords); - } - - public long getAvgOutputBatchSize() { - return RecordBatchSizer.safeDivide(sumOutputBatchSizes, numOutgoingBatches); - } - - public long getAvgOutputRowWidth() { - return RecordBatchSizer.safeDivide(sumOutputBatchSizes, totalOutputRecords); - } - - public void update(int inputIndex) {}; - - public void update() {}; - - public int getOutputRowCount() { - return outputRowCount; - } - - /** - * Given batchSize and rowWidth, this will set output rowCount taking into account - * the min and max that is allowed. - */ - public void setOutputRowCount(int targetBatchSize, int rowWidth) { - this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth)); - } - - public void setOutputRowCount(int outputRowCount) { - this.outputRowCount = outputRowCount; - } - - /** - * This will adjust rowCount taking into account the min and max that is allowed. - * We will round down to nearest power of two - 1 for better memory utilization. - * -1 is done for adjusting accounting for offset vectors. - */ - public static int adjustOutputRowCount(int rowCount) { - return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS))); - } - - public void setOutgoingRowWidth(int outgoingRowWidth) { - this.outgoingRowWidth = outgoingRowWidth; - } - - public int getOutgoingRowWidth() { - return outgoingRowWidth; - } - - public void setRecordBatchSizer(RecordBatchSizer sizer) { - this.sizer = sizer; - } - - public RecordBatchSizer getRecordBatchSizer() { - return sizer; - } - - public RecordBatchSizer.ColumnSize getColumnSize(String name) { - return sizer.getColumn(name); - } - - public void updateIncomingStats() { - numIncomingBatches++; - sumInputBatchSizes += sizer.netSize(); - totalInputRecords += sizer.rowCount(); - } - - public void updateOutgoingStats(int outputRecords) { - numOutgoingBatches++; - totalOutputRecords += outputRecords; - sumOutputBatchSizes += outgoingRowWidth * outputRecords; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java new file mode 100644 index 0000000..a8bb259 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.record; + +import com.google.common.base.Preconditions; +import org.apache.drill.exec.vector.ValueVector; + +public class RecordBatchMemoryManager { + protected static final int OFFSET_VECTOR_WIDTH = 4; + protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; + protected static final int MIN_NUM_ROWS = 1; + protected static final int DEFAULT_INPUT_INDEX = 0; + private int outputRowCount = MAX_NUM_ROWS; + private int outgoingRowWidth; + private RecordBatchSizer[] sizer; + private BatchStats[] inputBatchStats; + private BatchStats outputBatchStats; + + // By default, we expect one input batch stream and one output batch stream. + // Some operators can get multiple input batch streams i.e. for example + // joins get 2 batches (left and right). Merge Receiver can get more than 2. + private int numInputs = 1; + + private class BatchStats { + /** + * operator metric stats + */ + private long numBatches; + private long sumBatchSizes; + private long totalRecords; + + public long getNumBatches() { + return numBatches; + } + + public long getTotalRecords() { + return totalRecords; + } + + public long getAvgBatchSize() { + return RecordBatchSizer.safeDivide(sumBatchSizes, numBatches); + } + + public long getAvgRowWidth() { + return RecordBatchSizer.safeDivide(sumBatchSizes, totalRecords); + } + + public void incNumBatches() { + ++numBatches; + } + + public void incSumBatchSizes(long batchSize) { + sumBatchSizes += batchSize; + } + + public void incTotalRecords(long numRecords) { + totalRecords += numRecords; + } + + } + + public long getNumOutgoingBatches() { + return outputBatchStats.getNumBatches(); + } + + public long getTotalOutputRecords() { + return outputBatchStats.getTotalRecords(); + } + + public long getAvgOutputBatchSize() { + return outputBatchStats.getAvgBatchSize(); + } + + public long getAvgOutputRowWidth() { + return outputBatchStats.getAvgRowWidth(); + } + + public long getNumIncomingBatches() { + return inputBatchStats[DEFAULT_INPUT_INDEX].getNumBatches(); + } + + public long getAvgInputBatchSize() { + return inputBatchStats[DEFAULT_INPUT_INDEX].getAvgBatchSize(); + } + + public long getAvgInputRowWidth() { + return inputBatchStats[DEFAULT_INPUT_INDEX].getAvgRowWidth(); + } + + public long getTotalInputRecords() { + return inputBatchStats[DEFAULT_INPUT_INDEX].getTotalRecords(); + } + + public long getNumIncomingBatches(int index) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getNumBatches(); + } + + public long getAvgInputBatchSize(int index) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getAvgBatchSize(); + } + + public long getAvgInputRowWidth(int index) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getAvgRowWidth(); + } + + public long getTotalInputRecords(int index) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getTotalRecords(); + } + + public RecordBatchMemoryManager(int numInputs) { + this.numInputs = numInputs; + sizer = new RecordBatchSizer[numInputs]; + inputBatchStats = new BatchStats[numInputs]; + outputBatchStats = new BatchStats(); + } + + public RecordBatchMemoryManager() { + sizer = new RecordBatchSizer[numInputs]; + inputBatchStats = new BatchStats[numInputs]; + outputBatchStats = new BatchStats(); + } + + public void update(int inputIndex) {}; + + public void update() {}; + + public int getOutputRowCount() { + return outputRowCount; + } + + /** + * Given batchSize and rowWidth, this will set output rowCount taking into account + * the min and max that is allowed. + */ + public void setOutputRowCount(int targetBatchSize, int rowWidth) { + this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth)); + } + + public void setOutputRowCount(int outputRowCount) { + this.outputRowCount = outputRowCount; + } + + /** + * This will adjust rowCount taking into account the min and max that is allowed. + * We will round down to nearest power of two - 1 for better memory utilization. + * -1 is done for adjusting accounting for offset vectors. + */ + public static int adjustOutputRowCount(int rowCount) { + return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS))); + } + + public void setOutgoingRowWidth(int outgoingRowWidth) { + this.outgoingRowWidth = outgoingRowWidth; + } + + public int getOutgoingRowWidth() { + return outgoingRowWidth; + } + + public void setRecordBatchSizer(int index, RecordBatchSizer sizer) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + this.sizer[index] = sizer; + inputBatchStats[index] = new BatchStats(); + } + + public void setRecordBatchSizer(RecordBatchSizer sizer) { + this.sizer[DEFAULT_INPUT_INDEX] = sizer; + inputBatchStats[DEFAULT_INPUT_INDEX] = new BatchStats(); + } + + public RecordBatchSizer getRecordBatchSizer(int index) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + return sizer[index]; + } + + public RecordBatchSizer getRecordBatchSizer() { + return sizer[DEFAULT_INPUT_INDEX]; + } + + public RecordBatchSizer.ColumnSize getColumnSize(int index, String name) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + return sizer[index].getColumn(name); + } + + public RecordBatchSizer.ColumnSize getColumnSize(String name) { + return sizer[DEFAULT_INPUT_INDEX].getColumn(name); + } + + public void updateIncomingStats(int index) { + Preconditions.checkArgument(index >= 0 && index < numInputs); + Preconditions.checkArgument(inputBatchStats[index] != null); + inputBatchStats[index].incNumBatches(); + inputBatchStats[index].incSumBatchSizes(sizer[index].netSize()); + inputBatchStats[index].incTotalRecords(sizer[index].rowCount()); + } + + public void updateIncomingStats() { + inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches(); + inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].netSize()); + inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount()); + } + + public void updateOutgoingStats(int outputRecords) { + outputBatchStats.incNumBatches(); + outputBatchStats.incTotalRecords(outputRecords); + outputBatchStats.incSumBatchSizes(outgoingRowWidth * outputRecords); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java index 32c69ce..072a5cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java @@ -57,12 +57,12 @@ public class RecordIterator implements VectorAccessible { private final VectorContainer container; // Holds VectorContainer of current record batch private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create(); - private final AbstractRecordBatchMemoryManager newBatchCallBack; + private final RecordBatchMemoryManager newBatchCallBack; public RecordIterator(RecordBatch incoming, AbstractRecordBatch<?> outgoing, OperatorContext oContext, - int inputIndex, AbstractRecordBatchMemoryManager callBack) { + int inputIndex, RecordBatchMemoryManager callBack) { this(incoming, outgoing, oContext, inputIndex, true, callBack); } @@ -71,7 +71,7 @@ public class RecordIterator implements VectorAccessible { OperatorContext oContext, int inputIndex, boolean enableMarkAndReset, - AbstractRecordBatchMemoryManager callBack) { + RecordBatchMemoryManager callBack) { this.incoming = incoming; this.outgoing = outgoing; this.inputIndex = inputIndex;