This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-1400 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6b05be253a4479e259b1d88164ee2b245db275b2 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu May 27 19:55:55 2021 +0800 refactor UDTFDataSet --- .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 96 ++++++++++++---------- 1 file changed, 54 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java index 19d81c6..4fa2db9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java @@ -80,7 +80,7 @@ public abstract class UDTFDataSet extends QueryDataSet { readersOfSelectedSeries, cached); udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB); - initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB); + initTransformers(); } /** execute without value filters */ @@ -102,24 +102,34 @@ public abstract class UDTFDataSet extends QueryDataSet { deduplicatedDataTypes, readersOfSelectedSeries); udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB); - initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB); + initTransformers(); } - @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero - protected void initTransformers(float memoryBudgetInMB) - throws QueryProcessException, IOException { - int size = udtfPlan.getPathToIndex().size(); + protected void initTransformers() throws QueryProcessException, IOException { + final float memoryBudgetForSingleWindowTransformer = + calculateMemoryBudgetForSingleWindowTransformer(); + final int size = udtfPlan.getPathToIndex().size(); transformers = new Transformer[size]; + for (int i = 0; i < size; ++i) { + if (udtfPlan.isUdfColumn(i)) { + constructUDFTransformer(i, memoryBudgetForSingleWindowTransformer); + } else { + constructRawQueryTransformer(i); + } + } + } + @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero + private float calculateMemoryBudgetForSingleWindowTransformer() { + int size = udtfPlan.getPathToIndex().size(); int windowTransformerCount = 0; for (int i = 0; i < size; ++i) { if (udtfPlan.isUdfColumn(i)) { - AccessStrategy accessStrategy = - udtfPlan - .getExecutorByDataSetOutputColumnIndex(i) - .getConfigurations() - .getAccessStrategy(); - switch (accessStrategy.getAccessStrategyType()) { + switch (udtfPlan + .getExecutorByDataSetOutputColumnIndex(i) + .getConfigurations() + .getAccessStrategy() + .getAccessStrategyType()) { case SLIDING_SIZE_WINDOW: case SLIDING_TIME_WINDOW: ++windowTransformerCount; @@ -129,40 +139,34 @@ public abstract class UDTFDataSet extends QueryDataSet { } } } - memoryBudgetInMB /= Math.max(windowTransformerCount, 1); + return UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB / Math.max(windowTransformerCount, 1); + } - for (int i = 0; i < size; ++i) { - if (udtfPlan.isUdfColumn(i)) { - UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(i); - int[] readerIndexes = calculateReaderIndexes(executor); - AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy(); - switch (accessStrategy.getAccessStrategyType()) { - case ROW_BY_ROW: - transformers[i] = - new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor); - break; - case SLIDING_SIZE_WINDOW: - case SLIDING_TIME_WINDOW: - transformers[i] = - new UDFQueryRowWindowTransformer( - inputLayer.constructRowWindowReader( - readerIndexes, accessStrategy, memoryBudgetInMB), - executor); - break; - default: - throw new UnsupportedOperationException("Unsupported transformer access strategy"); - } - } else { - transformers[i] = - new RawQueryPointTransformer( - inputLayer.constructPointReader( - udtfPlan.getReaderIndex( - udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(i)))); - } + private void constructUDFTransformer( + int columnIndex, float memoryBudgetForSingleWindowTransformer) + throws QueryProcessException, IOException { + UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(columnIndex); + int[] readerIndexes = calculateUDFReaderIndexes(executor); + AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy(); + switch (accessStrategy.getAccessStrategyType()) { + case ROW_BY_ROW: + transformers[columnIndex] = + new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor); + break; + case SLIDING_SIZE_WINDOW: + case SLIDING_TIME_WINDOW: + transformers[columnIndex] = + new UDFQueryRowWindowTransformer( + inputLayer.constructRowWindowReader( + readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer), + executor); + break; + default: + throw new UnsupportedOperationException("Unsupported transformer access strategy"); } } - private int[] calculateReaderIndexes(UDTFExecutor executor) { + private int[] calculateUDFReaderIndexes(UDTFExecutor executor) { List<PartialPath> paths = executor.getExpression().getPaths(); int[] readerIndexes = new int[paths.size()]; for (int i = 0; i < readerIndexes.length; ++i) { @@ -171,6 +175,14 @@ public abstract class UDTFDataSet extends QueryDataSet { return readerIndexes; } + private void constructRawQueryTransformer(int columnIndex) { + transformers[columnIndex] = + new RawQueryPointTransformer( + inputLayer.constructPointReader( + udtfPlan.getReaderIndex( + udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(columnIndex)))); + } + public void finalizeUDFs(long queryId) { udtfPlan.finalizeUDFExecutors(queryId); }
