This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3631c85afc31be9bdf1bf4b59c0c6e7e618caa4c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Sep 6 15:43:30 2021 +0800

    SingleInputMultiOutputIntermediateLayer & 
SingleInputSingleOutputIntermediateLayer
---
 .../db/query/udf/core/layer/LayerCacheUtils.java   |  81 +++++++
 .../SingleInputMultiOutputIntermediateLayer.java   | 248 ++++++++++++++++-----
 .../SingleInputSingleOutputIntermediateLayer.java  |  89 ++------
 3 files changed, 303 insertions(+), 115 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
new file mode 100644
index 0000000..5801eef
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import 
org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class LayerCacheUtils {
+
+  private LayerCacheUtils() {}
+
+  /** @return number of actually collected, which may be less than or equals 
to pointNumber */
+  public static int cachePoints(
+      TSDataType dataType,
+      LayerPointReader source,
+      ElasticSerializableTVList target,
+      int pointNumber)
+      throws QueryProcessException, IOException {
+    int count = 0;
+    while (count < pointNumber && cachePoint(dataType, source, target)) {
+      ++count;
+    }
+    return count;
+  }
+
+  public static boolean cachePoint(
+      TSDataType dataType, LayerPointReader source, ElasticSerializableTVList 
target)
+      throws IOException, QueryProcessException {
+    if (!source.next()) {
+      return false;
+    }
+
+    switch (dataType) {
+      case INT32:
+        target.putInt(source.currentTime(), source.currentInt());
+        break;
+      case INT64:
+        target.putLong(source.currentTime(), source.currentLong());
+        break;
+      case FLOAT:
+        target.putFloat(source.currentTime(), source.currentFloat());
+        break;
+      case DOUBLE:
+        target.putDouble(source.currentTime(), source.currentDouble());
+        break;
+      case BOOLEAN:
+        target.putBoolean(source.currentTime(), source.currentBoolean());
+        break;
+      case TEXT:
+        target.putBinary(source.currentTime(), source.currentBinary());
+        break;
+      default:
+        throw new UnsupportedOperationException(dataType.name());
+    }
+
+    source.readyForNext();
+
+    return true;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
index 17467b8..b243e54 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
@@ -20,8 +20,12 @@
 package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
 import 
org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import 
org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import 
org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnRow;
+import 
org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnWindow;
 import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
@@ -66,20 +70,9 @@ public class SingleInputMultiOutputIntermediateLayer extends 
IntermediateLayer {
 
       @Override
       public boolean next() throws QueryProcessException, IOException {
-        if (hasCached) {
-          return true;
-        }
-
-        if (currentPointIndex < tvList.size() - 1) {
-          ++currentPointIndex;
-          hasCached = true;
-        }
-
-        // tvList.size() - 1 <= currentPointIndex
-        if (!hasCached && parentLayerPointReader.next()) {
-          cachePoint();
-          parentLayerPointReader.readyForNext();
-
+        if (!hasCached
+            && (currentPointIndex < tvList.size() - 1
+                || LayerCacheUtils.cachePoint(dataType, 
parentLayerPointReader, tvList))) {
           ++currentPointIndex;
           hasCached = true;
         }
@@ -87,43 +80,11 @@ public class SingleInputMultiOutputIntermediateLayer 
extends IntermediateLayer {
         return hasCached;
       }
 
-      private void cachePoint() throws IOException, QueryProcessException {
-        switch (dataType) {
-          case INT32:
-            tvList.putInt(
-                parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentInt());
-            break;
-          case INT64:
-            tvList.putLong(
-                parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentLong());
-            break;
-          case FLOAT:
-            tvList.putFloat(
-                parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentFloat());
-            break;
-          case DOUBLE:
-            tvList.putDouble(
-                parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentDouble());
-            break;
-          case BOOLEAN:
-            tvList.putBoolean(
-                parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentBoolean());
-            break;
-          case TEXT:
-            tvList.putBinary(
-                parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentBinary());
-            break;
-          default:
-            throw new UnsupportedOperationException(dataType.name());
-        }
-      }
-
       @Override
       public void readyForNext() {
         hasCached = false;
 
         safetyPile.moveForwardTo(currentPointIndex + 1);
-        // todo: reduce the update rate
         tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
       }
 
@@ -171,18 +132,205 @@ public class SingleInputMultiOutputIntermediateLayer 
extends IntermediateLayer {
 
   @Override
   public LayerRowReader constructRowReader() {
-    return null;
+
+    return new LayerRowReader() {
+
+      private final SafetyPile safetyPile = safetyLine.addSafetyPile();
+      private final ElasticSerializableTVListBackedSingleColumnRow row =
+          new ElasticSerializableTVListBackedSingleColumnRow(tvList, -1);
+
+      private boolean hasCached = false;
+      private int currentRowIndex = -1;
+
+      @Override
+      public boolean next() throws QueryProcessException, IOException {
+        if (!hasCached
+            && ((currentRowIndex < tvList.size() - 1)
+                || LayerCacheUtils.cachePoint(dataType, 
parentLayerPointReader, tvList))) {
+          row.seek(++currentRowIndex);
+          hasCached = true;
+        }
+
+        return hasCached;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+
+        safetyPile.moveForwardTo(currentRowIndex + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public long currentTime() throws IOException {
+        return row.getTime();
+      }
+
+      @Override
+      public Row currentRow() {
+        return row;
+      }
+    };
   }
 
   @Override
   protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
       SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
+
+    return new LayerRowWindowReader() {
+
+      private final int windowSize = strategy.getWindowSize();
+      private final int slidingStep = strategy.getSlidingStep();
+
+      private final SafetyPile safetyPile = safetyLine.addSafetyPile();
+      private final ElasticSerializableTVListBackedSingleColumnWindow window =
+          new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+      private boolean hasCached = false;
+      private int beginIndex = -slidingStep;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (hasCached) {
+          return true;
+        }
+
+        beginIndex += slidingStep;
+        int endIndex = beginIndex + windowSize;
+
+        int pointsToBeCollected = endIndex - tvList.size();
+        if (0 < pointsToBeCollected) {
+          hasCached =
+              LayerCacheUtils.cachePoints(
+                      dataType, parentLayerPointReader, tvList, 
pointsToBeCollected)
+                  != 0;
+          window.seek(beginIndex, tvList.size());
+        } else {
+          hasCached = true;
+          window.seek(beginIndex, endIndex);
+        }
+
+        return hasCached;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+
+        safetyPile.moveForwardTo(beginIndex + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
   }
 
   @Override
   protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws IOException, QueryProcessException {
+
+    final long timeInterval = strategy.getTimeInterval();
+    final long slidingStep = strategy.getSlidingStep();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+
+    final SafetyPile safetyPile = safetyLine.addSafetyPile();
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
+    if (tvList.size() == 0
+        && LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)
+        && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+      // display window begin should be set to the same as the min timestamp 
of the query result
+      // set
+      nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
+    }
+    long finalNextWindowTimeBeginGivenByStrategy = 
nextWindowTimeBeginGivenByStrategy;
+
+    final boolean hasAtLeastOneRow = tvList.size() != 0;
+
+    return new LayerRowWindowReader() {
+
+      private long nextWindowTimeBegin = 
finalNextWindowTimeBeginGivenByStrategy;
+      private int nextIndexBegin = 0;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (displayWindowEnd <= nextWindowTimeBegin) {
+          return false;
+        }
+        if (!hasAtLeastOneRow || 0 < tvList.size()) {
+          return true;
+        }
+
+        long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, 
displayWindowEnd);
+        int oldTVListSize = tvList.size();
+        while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) {
+          if (!LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, 
tvList)) {
+            if (displayWindowEnd == Long.MAX_VALUE
+                // display window end == the max timestamp of the query result 
set
+                && oldTVListSize == tvList.size()) {
+              return false;
+            } else {
+              break;
+            }
+          }
+        }
+
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          if (i == tvList.size() - 1) {
+            nextIndexBegin = tvList.size();
+          }
+        }
+
+        int nextIndexEnd = tvList.size();
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeEnd <= tvList.getTime(i)) {
+            nextIndexEnd = i;
+            break;
+          }
+        }
+        window.seek(nextIndexBegin, nextIndexEnd);
+
+        return true;
+      }
+
+      @Override
+      public void readyForNext() {
+        nextWindowTimeBegin += slidingStep;
+
+        safetyPile.moveForwardTo(nextIndexBegin + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 452ad3f..86111c1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -57,28 +57,27 @@ public class SingleInputSingleOutputIntermediateLayer 
extends IntermediateLayer
     return new LayerRowReader() {
 
       private final Row row = new 
LayerPointReaderBackedSingleColumnRow(parentLayerPointReader);
-      private final TSDataType[] dataTypes =
-          new TSDataType[] {parentLayerPointReader.getDataType()};
+
       private boolean hasCached = false;
 
       @Override
       public boolean next() throws IOException, QueryProcessException {
-        if (hasCached) {
-          return true;
+        if (!hasCached) {
+          hasCached = parentLayerPointReader.next();
         }
-        hasCached = parentLayerPointReader.next();
         return hasCached;
       }
 
       @Override
       public void readyForNext() {
-        parentLayerPointReader.readyForNext();
         hasCached = false;
+
+        parentLayerPointReader.readyForNext();
       }
 
       @Override
       public TSDataType[] getDataTypes() {
-        return dataTypes;
+        return new TSDataType[] {parentLayerPointReader.getDataType()};
       }
 
       @Override
@@ -123,7 +122,10 @@ public class SingleInputSingleOutputIntermediateLayer 
extends IntermediateLayer
 
         int pointsToBeCollected = endIndex - tvList.size();
         if (0 < pointsToBeCollected) {
-          hasCached = collectPoints(pointsToBeCollected, tvList) != 0;
+          hasCached =
+              LayerCacheUtils.cachePoints(
+                      dataType, parentLayerPointReader, tvList, 
pointsToBeCollected)
+                  != 0;
           window.seek(beginIndex, tvList.size());
         } else {
           hasCached = true;
@@ -166,14 +168,12 @@ public class SingleInputSingleOutputIntermediateLayer 
extends IntermediateLayer
         new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
 
     long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
-    if (tvList.size() == 0 && parentLayerPointReader.next()) {
-      collectPoints(1, tvList);
-
-      if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
-        // display window begin should be set to the same as the min timestamp 
of the query result
-        // set
-        nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
-      }
+    if (tvList.size() == 0
+        && LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)
+        && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+      // display window begin should be set to the same as the min timestamp 
of the query result
+      // set
+      nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
     }
     long finalNextWindowTimeBeginGivenByStrategy = 
nextWindowTimeBeginGivenByStrategy;
 
@@ -196,14 +196,14 @@ public class SingleInputSingleOutputIntermediateLayer 
extends IntermediateLayer
         long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, 
displayWindowEnd);
         int oldTVListSize = tvList.size();
         while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) {
-          if (parentLayerPointReader.next()) {
-            collectPoints(1, tvList);
-          } else if (displayWindowEnd == Long.MAX_VALUE
-              // display window end == the max timestamp of the query result 
set
-              && oldTVListSize == tvList.size()) {
-            return false;
-          } else {
-            break;
+          if (!LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, 
tvList)) {
+            if (displayWindowEnd == Long.MAX_VALUE
+                // display window end == the max timestamp of the query result 
set
+                && oldTVListSize == tvList.size()) {
+              return false;
+            } else {
+              break;
+            }
           }
         }
 
@@ -245,45 +245,4 @@ public class SingleInputSingleOutputIntermediateLayer 
extends IntermediateLayer
       }
     };
   }
-
-  /** @return number of actually collected, which may be less than or equals 
to pointNumber */
-  private int collectPoints(int pointNumber, ElasticSerializableTVList tvList)
-      throws QueryProcessException, IOException {
-    int count = 0;
-
-    while (parentLayerPointReader.next() && count < pointNumber) {
-      ++count;
-
-      switch (dataType) {
-        case INT32:
-          tvList.putInt(parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentInt());
-          break;
-        case INT64:
-          tvList.putLong(
-              parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentLong());
-          break;
-        case FLOAT:
-          tvList.putFloat(
-              parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentFloat());
-          break;
-        case DOUBLE:
-          tvList.putDouble(
-              parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentDouble());
-          break;
-        case BOOLEAN:
-          tvList.putBoolean(
-              parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentBoolean());
-          break;
-        case TEXT:
-          tvList.putBinary(
-              parentLayerPointReader.currentTime(), 
parentLayerPointReader.currentBinary());
-          break;
-        default:
-      }
-
-      parentLayerPointReader.readyForNext();
-    }
-
-    return count;
-  }
 }

Reply via email to