This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch nested-operations in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9e61bd6a0d3259a540e71c94f373d1cbdc65bf57 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Sep 6 11:31:08 2021 +0800 SingleInputSingleOutputIntermediateLayer: constructRowSlidingTimeWindowReader --- .../db/query/udf/core/layer/IntermediateLayer.java | 5 +- .../SingleInputSingleOutputIntermediateLayer.java | 175 ++++++++++++++++----- 2 files changed, 137 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java index 49aae1a..c7ae82c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java @@ -27,6 +27,8 @@ import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader; import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader; +import java.io.IOException; + public abstract class IntermediateLayer { protected final long queryId; @@ -61,5 +63,6 @@ public abstract class IntermediateLayer { throws QueryProcessException; protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader( - SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB); + SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) + throws QueryProcessException, IOException; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java index 3c1c705..452ad3f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java @@ -37,11 +37,13 @@ import java.io.IOException; public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer { private final LayerPointReader parentLayerPointReader; + private final TSDataType dataType; public SingleInputSingleOutputIntermediateLayer( long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) { super(queryId, memoryBudgetInMB); this.parentLayerPointReader = parentLayerPointReader; + dataType = parentLayerPointReader.getDataType(); } @Override @@ -101,7 +103,6 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer private final int windowSize = strategy.getWindowSize(); private final int slidingStep = strategy.getSlidingStep(); - private final TSDataType dataType = parentLayerPointReader.getDataType(); private final ElasticSerializableTVList tvList = ElasticSerializableTVList.newElasticSerializableTVList( dataType, queryId, memoryBudgetInMB, 2); @@ -122,7 +123,7 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer int pointsToBeCollected = endIndex - tvList.size(); if (0 < pointsToBeCollected) { - hasCached = collectPoints(pointsToBeCollected) != 0; + hasCached = collectPoints(pointsToBeCollected, tvList) != 0; window.seek(beginIndex, tvList.size()); } else { hasCached = true; @@ -132,50 +133,105 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer return hasCached; } - /** @return number of actually collected, which may be less than or equals to pointNumber */ - private int collectPoints(int pointNumber) throws QueryProcessException, IOException { - int count = 0; - - while (parentLayerPointReader.next() && count < pointNumber) { - ++count; - - switch (dataType) { - case INT32: - tvList.putInt( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt()); - break; - case INT64: - tvList.putLong( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong()); - break; - case FLOAT: - tvList.putFloat( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat()); - break; - case DOUBLE: - tvList.putDouble( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble()); - break; - case BOOLEAN: - tvList.putBoolean( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean()); - break; - case TEXT: - tvList.putBinary( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary()); - break; - default: + @Override + public void readyForNext() { + hasCached = false; + } + + @Override + public TSDataType[] getDataTypes() { + return new TSDataType[] {dataType}; + } + + @Override + public RowWindow currentWindow() { + return window; + } + }; + } + + @Override + protected LayerRowWindowReader constructRowSlidingTimeWindowReader( + SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) + throws QueryProcessException, IOException { + + final long timeInterval = strategy.getTimeInterval(); + final long slidingStep = strategy.getSlidingStep(); + final long displayWindowEnd = strategy.getDisplayWindowEnd(); + + final ElasticSerializableTVList tvList = + ElasticSerializableTVList.newElasticSerializableTVList( + dataType, queryId, memoryBudgetInMB, 2); + final ElasticSerializableTVListBackedSingleColumnWindow window = + new ElasticSerializableTVListBackedSingleColumnWindow(tvList); + + long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin(); + if (tvList.size() == 0 && parentLayerPointReader.next()) { + collectPoints(1, tvList); + + if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) { + // display window begin should be set to the same as the min timestamp of the query result + // set + nextWindowTimeBeginGivenByStrategy = tvList.getTime(0); + } + } + long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy; + + final boolean hasAtLeastOneRow = tvList.size() != 0; + + return new LayerRowWindowReader() { + + private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy; + private int nextIndexBegin = 0; + + @Override + public boolean next() throws IOException, QueryProcessException { + if (displayWindowEnd <= nextWindowTimeBegin) { + return false; + } + if (!hasAtLeastOneRow || 0 < tvList.size()) { + return true; + } + + long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd); + int oldTVListSize = tvList.size(); + while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) { + if (parentLayerPointReader.next()) { + collectPoints(1, tvList); + } else if (displayWindowEnd == Long.MAX_VALUE + // display window end == the max timestamp of the query result set + && oldTVListSize == tvList.size()) { + return false; + } else { + break; } + } - parentLayerPointReader.readyForNext(); + for (int i = nextIndexBegin; i < tvList.size(); ++i) { + if (nextWindowTimeBegin <= tvList.getTime(i)) { + nextIndexBegin = i; + break; + } + if (i == tvList.size() - 1) { + nextIndexBegin = tvList.size(); + } } - return count; + int nextIndexEnd = tvList.size(); + for (int i = nextIndexBegin; i < tvList.size(); ++i) { + if (nextWindowTimeEnd <= tvList.getTime(i)) { + nextIndexEnd = i; + break; + } + } + window.seek(nextIndexBegin, nextIndexEnd); + + return true; } @Override public void readyForNext() { - hasCached = false; + nextWindowTimeBegin += slidingStep; } @Override @@ -190,9 +246,44 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer }; } - @Override - protected LayerRowWindowReader constructRowSlidingTimeWindowReader( - SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) { - return null; + /** @return number of actually collected, which may be less than or equals to pointNumber */ + private int collectPoints(int pointNumber, ElasticSerializableTVList tvList) + throws QueryProcessException, IOException { + int count = 0; + + while (parentLayerPointReader.next() && count < pointNumber) { + ++count; + + switch (dataType) { + case INT32: + tvList.putInt(parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt()); + break; + case INT64: + tvList.putLong( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong()); + break; + case FLOAT: + tvList.putFloat( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat()); + break; + case DOUBLE: + tvList.putDouble( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble()); + break; + case BOOLEAN: + tvList.putBoolean( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean()); + break; + case TEXT: + tvList.putBinary( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary()); + break; + default: + } + + parentLayerPointReader.readyForNext(); + } + + return count; } }
