This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fixMergeReader
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 850e12daf33af818a7e13efd7163fce34ad5c7f9
Author: Minghui Liu <[email protected]>
AuthorDate: Thu Feb 22 17:02:52 2024 +0800

    fix merge reader bug
---
 .../execution/operator/source/SeriesScanUtil.java  | 38 ++++++---------
 .../tsfile/read/common/block/TsBlockUtil.java      | 55 ++++++++++++++++++++++
 .../tsfile/read/reader/page/AlignedPageReader.java | 54 +++------------------
 3 files changed, 75 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 77375d41fac..cd1dc52e575 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -643,7 +644,9 @@ public class SeriesScanUtil {
 
     if (hasCachedNextOverlappedPage) {
       hasCachedNextOverlappedPage = false;
-      TsBlock res = cachedTsBlock;
+      TsBlock res =
+          applyPushDownFilterAndLimitOffset(
+              cachedTsBlock, scanOptions.getPushDownFilter(), 
paginationController);
       cachedTsBlock = null;
 
       // cached tsblock has handled by pagination controller & push down 
filter, return directly
@@ -672,6 +675,15 @@ public class SeriesScanUtil {
     }
   }
 
+  private TsBlock applyPushDownFilterAndLimitOffset(
+      TsBlock tsBlock, Filter pushDownFilter, PaginationController 
paginationController) {
+    if (pushDownFilter == null) {
+      return paginationController.applyTsBlock(tsBlock);
+    }
+    return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+        tsBlock, new TsBlockBuilder(getTsDataTypeList()), pushDownFilter, 
paginationController);
+  }
+
   private void filterFirstPageReader() {
     if (firstPageReader == null) {
       return;
@@ -708,7 +720,6 @@ public class SeriesScanUtil {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   private boolean hasNextOverlappedPage() throws IOException {
     long startTime = System.nanoTime();
-    Filter pushDownFilter = scanOptions.getPushDownFilter();
     try {
       if (hasCachedNextOverlappedPage) {
         return true;
@@ -810,9 +821,7 @@ public class SeriesScanUtil {
 
             // get the latest first point in mergeReader
             timeValuePair = mergeReader.nextTimeValuePair();
-            if (processFilterAndPagination(timeValuePair, pushDownFilter, 
builder)) {
-              break;
-            }
+            addTimeValuePairToResult(timeValuePair, builder);
           }
           hasCachedNextOverlappedPage = !builder.isEmpty();
           cachedTsBlock = builder.build();
@@ -875,25 +884,6 @@ public class SeriesScanUtil {
     unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
   }
 
-  private boolean processFilterAndPagination(
-      TimeValuePair timeValuePair, Filter pushDownFilter, TsBlockBuilder 
builder) {
-    if (pushDownFilter != null
-        && !pushDownFilter.satisfyRow(timeValuePair.getTimestamp(), 
timeValuePair.getValues())) {
-      return false;
-    }
-    if (paginationController.hasCurOffset()) {
-      paginationController.consumeOffset();
-      return false;
-    }
-    if (paginationController.hasCurLimit()) {
-      addTimeValuePairToResult(timeValuePair, builder);
-      paginationController.consumeLimit();
-      return false;
-    } else {
-      return true;
-    }
-  }
-
   private void addTimeValuePairToResult(TimeValuePair timeValuePair, 
TsBlockBuilder builder) {
     builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp());
     switch (dataType) {
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
index 8e10bcea023..3bf472088a7 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.tsfile.read.common.block;
 
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
 
 public class TsBlockUtil {
 
@@ -65,4 +67,57 @@ public class TsBlockUtil {
     }
     return left;
   }
+
+  public static TsBlock applyFilterAndLimitOffsetToTsBlock(
+      TsBlock unFilteredBlock,
+      TsBlockBuilder builder,
+      Filter pushDownFilter,
+      PaginationController paginationController) {
+    boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);
+
+    // construct time column
+    int readEndIndex =
+        buildTimeColumnWithPagination(
+            unFilteredBlock, builder, keepCurrentRow, paginationController);
+
+    // construct value columns
+    for (int i = 0; i < builder.getValueColumnBuilders().length; i++) {
+      for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
+        if (keepCurrentRow[rowIndex]) {
+          if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
+            builder.getColumnBuilder(i).appendNull();
+          } else {
+            builder
+                .getColumnBuilder(i)
+                
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
+          }
+        }
+      }
+    }
+    return builder.build();
+  }
+
+  private static int buildTimeColumnWithPagination(
+      TsBlock unFilteredBlock,
+      TsBlockBuilder builder,
+      boolean[] keepCurrentRow,
+      PaginationController paginationController) {
+    int readEndIndex = unFilteredBlock.getPositionCount();
+    for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
+      if (keepCurrentRow[rowIndex]) {
+        if (paginationController.hasCurOffset()) {
+          paginationController.consumeOffset();
+          keepCurrentRow[rowIndex] = false;
+        } else if (paginationController.hasCurLimit()) {
+          
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
+          builder.declarePosition();
+          paginationController.consumeLimit();
+        } else {
+          readEndIndex = rowIndex;
+          break;
+        }
+      }
+    }
+    return readEndIndex;
+  }
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index b32a64cfdb9..c24b76798e4 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -205,11 +206,14 @@ public class AlignedPageReader implements IPageReader {
     // construct value columns
     buildValueColumns(readEndIndex, keepCurrentRow, isDeleted);
 
+    TsBlock unFilteredBlock = builder.build();
     if (pushDownFilterAllSatisfy) {
       // OFFSET & LIMIT has been consumed in buildTimeColumn
-      return builder.build();
+      return unFilteredBlock;
     }
-    return applyPushDownFilter();
+    builder.reset();
+    return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+        unFilteredBlock, builder, pushDownFilter, paginationController);
   }
 
   private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) {
@@ -279,26 +283,6 @@ public class AlignedPageReader implements IPageReader {
     return readEndIndex;
   }
 
-  private int buildTimeColumnWithPagination(TsBlock unFilteredBlock, boolean[] 
keepCurrentRow) {
-    int readEndIndex = unFilteredBlock.getPositionCount();
-    for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
-      if (keepCurrentRow[rowIndex]) {
-        if (paginationController.hasCurOffset()) {
-          paginationController.consumeOffset();
-          keepCurrentRow[rowIndex] = false;
-        } else if (paginationController.hasCurLimit()) {
-          
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
-          builder.declarePosition();
-          paginationController.consumeLimit();
-        } else {
-          readEndIndex = rowIndex;
-          break;
-        }
-      }
-    }
-    return readEndIndex;
-  }
-
   private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[] 
keepCurrentRow) {
     int readEndIndex = 0;
     for (int i = 0; i < timeBatch.length; i++) {
@@ -386,32 +370,6 @@ public class AlignedPageReader implements IPageReader {
     }
   }
 
-  private TsBlock applyPushDownFilter() {
-    TsBlock unFilteredBlock = builder.build();
-    builder.reset();
-
-    boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);
-
-    // construct time column
-    int readEndIndex = buildTimeColumnWithPagination(unFilteredBlock, 
keepCurrentRow);
-
-    // construct value columns
-    for (int i = 0; i < valueCount; i++) {
-      for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
-        if (keepCurrentRow[rowIndex]) {
-          if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
-            builder.getColumnBuilder(i).appendNull();
-          } else {
-            builder
-                .getColumnBuilder(i)
-                
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
-          }
-        }
-      }
-    }
-    return builder.build();
-  }
-
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
     for (int i = 0; i < valueCount; i++) {
       if (valuePageReaderList.get(i) != null) {

Reply via email to