This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch nested-operations in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3631c85afc31be9bdf1bf4b59c0c6e7e618caa4c Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Sep 6 15:43:30 2021 +0800 SingleInputMultiOutputIntermediateLayer & SingleInputSingleOutputIntermediateLayer --- .../db/query/udf/core/layer/LayerCacheUtils.java | 81 +++++++ .../SingleInputMultiOutputIntermediateLayer.java | 248 ++++++++++++++++----- .../SingleInputSingleOutputIntermediateLayer.java | 89 ++------ 3 files changed, 303 insertions(+), 115 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java new file mode 100644 index 0000000..5801eef --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.core.layer; + +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; +import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import java.io.IOException; + +public class LayerCacheUtils { + + private LayerCacheUtils() {} + + /** @return number of actually collected, which may be less than or equals to pointNumber */ + public static int cachePoints( + TSDataType dataType, + LayerPointReader source, + ElasticSerializableTVList target, + int pointNumber) + throws QueryProcessException, IOException { + int count = 0; + while (count < pointNumber && cachePoint(dataType, source, target)) { + ++count; + } + return count; + } + + public static boolean cachePoint( + TSDataType dataType, LayerPointReader source, ElasticSerializableTVList target) + throws IOException, QueryProcessException { + if (!source.next()) { + return false; + } + + switch (dataType) { + case INT32: + target.putInt(source.currentTime(), source.currentInt()); + break; + case INT64: + target.putLong(source.currentTime(), source.currentLong()); + break; + case FLOAT: + target.putFloat(source.currentTime(), source.currentFloat()); + break; + case DOUBLE: + target.putDouble(source.currentTime(), source.currentDouble()); + break; + case BOOLEAN: + target.putBoolean(source.currentTime(), source.currentBoolean()); + break; + case TEXT: + target.putBinary(source.currentTime(), source.currentBinary()); + break; + default: + throw new UnsupportedOperationException(dataType.name()); + } + + source.readyForNext(); + + return true; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java index 17467b8..b243e54 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java @@ -20,8 +20,12 @@ package org.apache.iotdb.db.query.udf.core.layer; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.udf.api.access.Row; +import org.apache.iotdb.db.query.udf.api.access.RowWindow; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnRow; +import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnWindow; import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader; @@ -66,20 +70,9 @@ public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer { @Override public boolean next() throws QueryProcessException, IOException { - if (hasCached) { - return true; - } - - if (currentPointIndex < tvList.size() - 1) { - ++currentPointIndex; - hasCached = true; - } - - // tvList.size() - 1 <= currentPointIndex - if (!hasCached && parentLayerPointReader.next()) { - cachePoint(); - parentLayerPointReader.readyForNext(); - + if (!hasCached + && (currentPointIndex < tvList.size() - 1 + || LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList))) { ++currentPointIndex; hasCached = true; } @@ -87,43 +80,11 @@ public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer { return hasCached; } - private void cachePoint() throws IOException, QueryProcessException { - switch (dataType) { - case INT32: - tvList.putInt( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt()); - break; - case INT64: - tvList.putLong( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong()); - break; - case FLOAT: - tvList.putFloat( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat()); - break; - case DOUBLE: - tvList.putDouble( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble()); - break; - case BOOLEAN: - tvList.putBoolean( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean()); - break; - case TEXT: - tvList.putBinary( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary()); - break; - default: - throw new UnsupportedOperationException(dataType.name()); - } - } - @Override public void readyForNext() { hasCached = false; safetyPile.moveForwardTo(currentPointIndex + 1); - // todo: reduce the update rate tvList.setEvictionUpperBound(safetyLine.getSafetyLine()); } @@ -171,18 +132,205 @@ public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer { @Override public LayerRowReader constructRowReader() { - return null; + + return new LayerRowReader() { + + private final SafetyPile safetyPile = safetyLine.addSafetyPile(); + private final ElasticSerializableTVListBackedSingleColumnRow row = + new ElasticSerializableTVListBackedSingleColumnRow(tvList, -1); + + private boolean hasCached = false; + private int currentRowIndex = -1; + + @Override + public boolean next() throws QueryProcessException, IOException { + if (!hasCached + && ((currentRowIndex < tvList.size() - 1) + || LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList))) { + row.seek(++currentRowIndex); + hasCached = true; + } + + return hasCached; + } + + @Override + public void readyForNext() { + hasCached = false; + + safetyPile.moveForwardTo(currentRowIndex + 1); + tvList.setEvictionUpperBound(safetyLine.getSafetyLine()); + } + + @Override + public TSDataType[] getDataTypes() { + return new TSDataType[] {dataType}; + } + + @Override + public long currentTime() throws IOException { + return row.getTime(); + } + + @Override + public Row currentRow() { + return row; + } + }; } @Override protected LayerRowWindowReader constructRowSlidingSizeWindowReader( SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) { - return null; + + return new LayerRowWindowReader() { + + private final int windowSize = strategy.getWindowSize(); + private final int slidingStep = strategy.getSlidingStep(); + + private final SafetyPile safetyPile = safetyLine.addSafetyPile(); + private final ElasticSerializableTVListBackedSingleColumnWindow window = + new ElasticSerializableTVListBackedSingleColumnWindow(tvList); + + private boolean hasCached = false; + private int beginIndex = -slidingStep; + + @Override + public boolean next() throws IOException, QueryProcessException { + if (hasCached) { + return true; + } + + beginIndex += slidingStep; + int endIndex = beginIndex + windowSize; + + int pointsToBeCollected = endIndex - tvList.size(); + if (0 < pointsToBeCollected) { + hasCached = + LayerCacheUtils.cachePoints( + dataType, parentLayerPointReader, tvList, pointsToBeCollected) + != 0; + window.seek(beginIndex, tvList.size()); + } else { + hasCached = true; + window.seek(beginIndex, endIndex); + } + + return hasCached; + } + + @Override + public void readyForNext() { + hasCached = false; + + safetyPile.moveForwardTo(beginIndex + 1); + tvList.setEvictionUpperBound(safetyLine.getSafetyLine()); + } + + @Override + public TSDataType[] getDataTypes() { + return new TSDataType[] {dataType}; + } + + @Override + public RowWindow currentWindow() { + return window; + } + }; } @Override protected LayerRowWindowReader constructRowSlidingTimeWindowReader( - SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) { - return null; + SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) + throws IOException, QueryProcessException { + + final long timeInterval = strategy.getTimeInterval(); + final long slidingStep = strategy.getSlidingStep(); + final long displayWindowEnd = strategy.getDisplayWindowEnd(); + + final SafetyPile safetyPile = safetyLine.addSafetyPile(); + final ElasticSerializableTVListBackedSingleColumnWindow window = + new ElasticSerializableTVListBackedSingleColumnWindow(tvList); + + long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin(); + if (tvList.size() == 0 + && LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList) + && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) { + // display window begin should be set to the same as the min timestamp of the query result + // set + nextWindowTimeBeginGivenByStrategy = tvList.getTime(0); + } + long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy; + + final boolean hasAtLeastOneRow = tvList.size() != 0; + + return new LayerRowWindowReader() { + + private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy; + private int nextIndexBegin = 0; + + @Override + public boolean next() throws IOException, QueryProcessException { + if (displayWindowEnd <= nextWindowTimeBegin) { + return false; + } + if (!hasAtLeastOneRow || 0 < tvList.size()) { + return true; + } + + long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd); + int oldTVListSize = tvList.size(); + while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) { + if (!LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)) { + if (displayWindowEnd == Long.MAX_VALUE + // display window end == the max timestamp of the query result set + && oldTVListSize == tvList.size()) { + return false; + } else { + break; + } + } + } + + for (int i = nextIndexBegin; i < tvList.size(); ++i) { + if (nextWindowTimeBegin <= tvList.getTime(i)) { + nextIndexBegin = i; + break; + } + if (i == tvList.size() - 1) { + nextIndexBegin = tvList.size(); + } + } + + int nextIndexEnd = tvList.size(); + for (int i = nextIndexBegin; i < tvList.size(); ++i) { + if (nextWindowTimeEnd <= tvList.getTime(i)) { + nextIndexEnd = i; + break; + } + } + window.seek(nextIndexBegin, nextIndexEnd); + + return true; + } + + @Override + public void readyForNext() { + nextWindowTimeBegin += slidingStep; + + safetyPile.moveForwardTo(nextIndexBegin + 1); + tvList.setEvictionUpperBound(safetyLine.getSafetyLine()); + } + + @Override + public TSDataType[] getDataTypes() { + return new TSDataType[] {dataType}; + } + + @Override + public RowWindow currentWindow() { + return window; + } + }; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java index 452ad3f..86111c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java @@ -57,28 +57,27 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer return new LayerRowReader() { private final Row row = new LayerPointReaderBackedSingleColumnRow(parentLayerPointReader); - private final TSDataType[] dataTypes = - new TSDataType[] {parentLayerPointReader.getDataType()}; + private boolean hasCached = false; @Override public boolean next() throws IOException, QueryProcessException { - if (hasCached) { - return true; + if (!hasCached) { + hasCached = parentLayerPointReader.next(); } - hasCached = parentLayerPointReader.next(); return hasCached; } @Override public void readyForNext() { - parentLayerPointReader.readyForNext(); hasCached = false; + + parentLayerPointReader.readyForNext(); } @Override public TSDataType[] getDataTypes() { - return dataTypes; + return new TSDataType[] {parentLayerPointReader.getDataType()}; } @Override @@ -123,7 +122,10 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer int pointsToBeCollected = endIndex - tvList.size(); if (0 < pointsToBeCollected) { - hasCached = collectPoints(pointsToBeCollected, tvList) != 0; + hasCached = + LayerCacheUtils.cachePoints( + dataType, parentLayerPointReader, tvList, pointsToBeCollected) + != 0; window.seek(beginIndex, tvList.size()); } else { hasCached = true; @@ -166,14 +168,12 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer new ElasticSerializableTVListBackedSingleColumnWindow(tvList); long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin(); - if (tvList.size() == 0 && parentLayerPointReader.next()) { - collectPoints(1, tvList); - - if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) { - // display window begin should be set to the same as the min timestamp of the query result - // set - nextWindowTimeBeginGivenByStrategy = tvList.getTime(0); - } + if (tvList.size() == 0 + && LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList) + && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) { + // display window begin should be set to the same as the min timestamp of the query result + // set + nextWindowTimeBeginGivenByStrategy = tvList.getTime(0); } long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy; @@ -196,14 +196,14 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd); int oldTVListSize = tvList.size(); while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) { - if (parentLayerPointReader.next()) { - collectPoints(1, tvList); - } else if (displayWindowEnd == Long.MAX_VALUE - // display window end == the max timestamp of the query result set - && oldTVListSize == tvList.size()) { - return false; - } else { - break; + if (!LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)) { + if (displayWindowEnd == Long.MAX_VALUE + // display window end == the max timestamp of the query result set + && oldTVListSize == tvList.size()) { + return false; + } else { + break; + } } } @@ -245,45 +245,4 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer } }; } - - /** @return number of actually collected, which may be less than or equals to pointNumber */ - private int collectPoints(int pointNumber, ElasticSerializableTVList tvList) - throws QueryProcessException, IOException { - int count = 0; - - while (parentLayerPointReader.next() && count < pointNumber) { - ++count; - - switch (dataType) { - case INT32: - tvList.putInt(parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt()); - break; - case INT64: - tvList.putLong( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong()); - break; - case FLOAT: - tvList.putFloat( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat()); - break; - case DOUBLE: - tvList.putDouble( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble()); - break; - case BOOLEAN: - tvList.putBoolean( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean()); - break; - case TEXT: - tvList.putBinary( - parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary()); - break; - default: - } - - parentLayerPointReader.readyForNext(); - } - - return count; - } }
