This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-1971 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4f9d53acd8dbba0e2364ef9377e11fb872a18f58 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Dec 1 16:52:29 2021 +0800 fix bug when spliting --- .../query/dataset/udf/UDTFAlignByTimeDataSet.java | 6 ++-- .../iotdb/db/query/dataset/udf/UDTFDataSet.java | 6 ++-- .../db/query/dataset/udf/UDTFFragmentDataSet.java | 11 ++++--- .../query/dataset/udf/UDTFFragmentDataSetTask.java | 17 +++++----- .../db/query/dataset/udf/UDTFJoinDataSet.java | 3 +- .../iotdb/db/query/expression/Expression.java | 3 ++ .../query/expression/binary/BinaryExpression.java | 14 +++++++++ .../db/query/expression/unary/ConstantOperand.java | 7 +++++ .../query/expression/unary/FunctionExpression.java | 18 +++++++++++ .../query/expression/unary/NegationExpression.java | 9 ++++++ .../query/expression/unary/TimeSeriesOperand.java | 7 +++++ .../pool/DataSetFragmentExecutionPoolManager.java | 8 +++-- .../db/query/udf/core/layer/LayerBuilder.java | 10 +++--- .../query/udf/core/layer/RawQueryInputLayer.java | 36 ++++++++++++---------- 14 files changed, 111 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java index d3bc678..b0b6a3e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; +import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.datastructure.TimeSelector; @@ -80,9 +81,10 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy } /** for data set fragment */ - protected UDTFAlignByTimeDataSet(LayerPointReader[] transformers) + protected UDTFAlignByTimeDataSet( + RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers) throws QueryProcessException, IOException { - super(transformers); + super(rawQueryInputLayer, transformers); initTimeHeap(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java index 5700336..3e523b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java @@ -128,12 +128,12 @@ public abstract class UDTFDataSet extends QueryDataSet { } /** for data set fragment */ - protected UDTFDataSet(LayerPointReader[] transformers) { - // The following 3 fields are useless because they are recorded in their parent data set. + protected UDTFDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers) { + // The following 2 fields are useless. queryId = -1; udtfPlan = null; - rawQueryInputLayer = null; + this.rawQueryInputLayer = rawQueryInputLayer; this.transformers = transformers; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java index 50c646a..8d1cbbb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java @@ -21,8 +21,10 @@ package org.apache.iotdb.db.query.dataset.udf; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.query.pool.DataSetFragmentExecutionPoolManager; +import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +33,7 @@ import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet { +public class UDTFFragmentDataSet extends QueryDataSet { private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSet.class); @@ -39,6 +41,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet { private static final DataSetFragmentExecutionPoolManager DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER = DataSetFragmentExecutionPoolManager.getInstance(); + private final QueryDataSet fragmentDataSet; private final BlockingQueue<Object[]> productionBlockingQueue; private RowRecord[] rowRecords = null; @@ -47,9 +50,9 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet { private boolean hasNextRowRecords = true; - public UDTFFragmentDataSet(LayerPointReader[] transformers) + public UDTFFragmentDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers) throws QueryProcessException, IOException { - super(transformers); + fragmentDataSet = new UDTFAlignByTimeDataSet(rawQueryInputLayer, transformers); productionBlockingQueue = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY); submitTask(); } @@ -105,7 +108,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet { private void submitTask() { if (productionBlockingQueue.remainingCapacity() > 0) { DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER.submit( - new UDTFFragmentDataSetTask(fetchSize, this, productionBlockingQueue)); + new UDTFFragmentDataSetTask(fetchSize, fragmentDataSet, productionBlockingQueue)); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java index fb50dea..62452dc 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.dataset.udf; import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,22 +33,20 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSetTask.class); private final int fetchSize; - private final UDTFFragmentDataSet fragmentDataSet; + private final QueryDataSet queryDataSet; // there are 3 elements in Object[]. // [0]: RowRecord[] or Throwable. // [2]: Integer. actual length of produced row records in [0]. note that the element is -1 when // the [0] element is a Throwable. - // [1]: Boolean. true if the fragmentDataSet still has next RowRecord to be consumed, otherwise + // [1]: Boolean. true if the queryDataSet still has next RowRecord to be consumed, otherwise // false. note that the element is false when the [0] element is a Throwable. private final BlockingQueue<Object[]> productionBlockingQueue; public UDTFFragmentDataSetTask( - int fetchSize, - UDTFFragmentDataSet fragmentDataSet, - BlockingQueue<Object[]> productionBlockingQueue) { + int fetchSize, QueryDataSet queryDataSet, BlockingQueue<Object[]> productionBlockingQueue) { this.fetchSize = fetchSize; - this.fragmentDataSet = fragmentDataSet; + this.queryDataSet = queryDataSet; this.productionBlockingQueue = productionBlockingQueue; } @@ -56,13 +55,13 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable { try { int rowRecordCount = 0; RowRecord[] rowRecords = new RowRecord[fetchSize]; - while (rowRecordCount < fetchSize && fragmentDataSet.hasNextWithoutConstraint()) { - rowRecords[rowRecordCount++] = fragmentDataSet.nextWithoutConstraint(); + while (rowRecordCount < fetchSize && queryDataSet.hasNextWithoutConstraint()) { + rowRecords[rowRecordCount++] = queryDataSet.nextWithoutConstraint(); } // if a task is submitted, there must be free space in the queue productionBlockingQueue.put( - new Object[] {rowRecords, rowRecordCount, fragmentDataSet.hasNextWithoutConstraint()}); + new Object[] {rowRecords, rowRecordCount, queryDataSet.hasNextWithoutConstraint()}); } catch (Throwable e) { onThrowable(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java index ca2ef3a..323e1c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java @@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import java.io.IOException; -// TODO: performances joining in pool, packing row records while calculating public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet { private final UDTFFragmentDataSet[] fragmentDataSets; @@ -68,7 +67,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa timeHeap = new TimeSelector(resultColumnsLength << 1, true); for (int i = 0; i < resultColumnsLength; ++i) { - UDTFDataSet fragmentDataSet = fragmentDataSets[i]; + QueryDataSet fragmentDataSet = fragmentDataSets[i]; if (fragmentDataSet.hasNextWithoutConstraint()) { rowRecordsCache[i] = fragmentDataSet.nextWithoutConstraint(); timeHeap.add(rowRecordsCache[i].getTimestamp()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java index 27df69b..6284c02 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java @@ -96,6 +96,9 @@ public abstract class Expression { return expressionIntermediateLayerMap.get(this); } + public abstract Integer tryToGetFragmentDataSetIndex( + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap); + /** Sub-classes should override this method indicating if the expression is a constant operand */ protected abstract boolean isConstantOperandInternal(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java index 176b746..9fc05dd 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java @@ -188,6 +188,20 @@ public abstract class BinaryExpression extends Expression { LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader); @Override + public Integer tryToGetFragmentDataSetIndex( + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) { + IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this); + if (intermediateLayer != null) { + return intermediateLayer.getFragmentDataSetIndex(); + } + + Integer index = leftExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap); + return index != null + ? index + : rightExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap); + } + + @Override public final String getExpressionStringInternal() { StringBuilder builder = new StringBuilder(); if (leftExpression instanceof BinaryExpression) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java index 0a58321..f55c82c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java @@ -102,6 +102,13 @@ public class ConstantOperand extends Expression { } @Override + public Integer tryToGetFragmentDataSetIndex( + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) { + IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this); + return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex(); + } + + @Override public String getExpressionStringInternal() { return valueString; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java index 2217fe6..b9e8d7c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java @@ -293,6 +293,24 @@ public class FunctionExpression extends Expression { } } + @Override + public Integer tryToGetFragmentDataSetIndex( + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) { + IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this); + if (intermediateLayer != null) { + return intermediateLayer.getFragmentDataSetIndex(); + } + + for (Expression expression : expressions) { + Integer index = expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap); + if (index != null) { + return index; + } + } + + return null; + } + public List<PartialPath> getPaths() { if (paths == null) { paths = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java index 39f4bab..c57d8e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java @@ -138,6 +138,15 @@ public class NegationExpression extends Expression { } @Override + public Integer tryToGetFragmentDataSetIndex( + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) { + IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this); + return intermediateLayer != null + ? intermediateLayer.getFragmentDataSetIndex() + : expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap); + } + + @Override public String getExpressionStringInternal() { return "-" + expression.toString(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java index 9c65d48..ec2a4de 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java @@ -116,6 +116,13 @@ public class TimeSeriesOperand extends Expression { this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader)); } + @Override + public Integer tryToGetFragmentDataSetIndex( + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) { + IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this); + return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex(); + } + public String getExpressionStringInternal() { return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java index 0705380..c62e415 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java @@ -35,7 +35,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager { private DataSetFragmentExecutionPoolManager() { pool = IoTDBThreadPoolFactory.newFixedThreadPool( - IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(), + Math.min( + Runtime.getRuntime().availableProcessors(), + IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()), ThreadName.QUERY_FRAGMENT_SERVICE.getName()); } @@ -58,7 +60,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager { if (pool == null) { pool = IoTDBThreadPoolFactory.newFixedThreadPool( - IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(), + Math.min( + Runtime.getRuntime().availableProcessors(), + IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()), ThreadName.QUERY_FRAGMENT_SERVICE.getName()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java index 1541829..a7b0582 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java @@ -93,12 +93,9 @@ public class LayerBuilder { public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException { for (int i = 0, n = resultColumnExpressions.length; i < n; ++i) { // resultColumnExpressions[i] -> the index of the fragment it belongs to - int fragmentDataSetIndex; - IntermediateLayer intermediateLayer = - expressionIntermediateLayerMap.get(resultColumnExpressions[i]); - if (intermediateLayer != null) { - fragmentDataSetIndex = intermediateLayer.getFragmentDataSetIndex(); - } else { + Integer fragmentDataSetIndex = + resultColumnExpressions[i].tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap); + if (fragmentDataSetIndex == null) { fragmentDataSetIndex = fragmentDataSetIndexToLayerPointReaders.size(); fragmentDataSetIndexToLayerPointReaders.add(new ArrayList<>()); } @@ -150,6 +147,7 @@ public class LayerBuilder { for (int i = 0; i < n; ++i) { fragmentDataSets[i] = new UDTFFragmentDataSet( + rawTimeSeriesInputLayer, fragmentDataSetIndexToLayerPointReaders.get(i).toArray(new LayerPointReader[0])); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java index 55d8aca..5efef6a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java @@ -87,7 +87,9 @@ public class RawQueryInputLayer { } public void updateRowRecordListEvictionUpperBound() { - rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine()); + synchronized (rowRecordList) { + rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine()); + } } public LayerPointReader constructPointReader(int columnIndex) { @@ -125,27 +127,29 @@ public class RawQueryInputLayer { return true; } - for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) { - Object[] rowRecordCandidate = rowRecordList.getRowRecord(i); - if (rowRecordCandidate[columnIndex] != null) { - hasCachedRowRecord = true; - cachedRowRecord = rowRecordCandidate; - currentRowIndex = i; - break; - } - } - - if (!hasCachedRowRecord) { - while (queryDataSet.hasNextRowInObjects()) { - Object[] rowRecordCandidate = queryDataSet.nextRowInObjects(); - rowRecordList.put(rowRecordCandidate); + synchronized (rowRecordList) { + for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) { + Object[] rowRecordCandidate = rowRecordList.getRowRecord(i); if (rowRecordCandidate[columnIndex] != null) { hasCachedRowRecord = true; cachedRowRecord = rowRecordCandidate; - currentRowIndex = rowRecordList.size() - 1; + currentRowIndex = i; break; } } + + if (!hasCachedRowRecord) { + while (queryDataSet.hasNextRowInObjects()) { + Object[] rowRecordCandidate = queryDataSet.nextRowInObjects(); + rowRecordList.put(rowRecordCandidate); + if (rowRecordCandidate[columnIndex] != null) { + hasCachedRowRecord = true; + cachedRowRecord = rowRecordCandidate; + currentRowIndex = rowRecordList.size() - 1; + break; + } + } + } } return hasCachedRowRecord;
