This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new b90f6760f support accurate filter rows recording during query 
execution (#744)
b90f6760f is described below

commit b90f6760f619126f9c1a570705daa5c2f42192b5
Author: alpass163 <[email protected]>
AuthorDate: Wed Mar 18 19:23:34 2026 +0800

    support accurate filter rows recording during query execution (#744)
---
 .../tsfile/read/common/block/TsBlockUtil.java      | 25 +++++++-
 .../apache/tsfile/read/filter/basic/Filter.java    | 21 ++++++
 .../org/apache/tsfile/read/reader/IPageReader.java | 13 ++++
 .../reader/chunk/AbstractAlignedChunkReader.java   | 19 ++++--
 .../read/reader/chunk/AbstractChunkReader.java     |  6 +-
 .../read/reader/chunk/AlignedChunkReader.java      | 24 +++++--
 .../tsfile/read/reader/chunk/ChunkReader.java      | 26 ++++++--
 .../tsfile/read/reader/chunk/TableChunkReader.java | 20 +++++-
 .../reader/page/AbstractAlignedPageReader.java     | 75 ++++++++++++++++++++++
 .../apache/tsfile/read/reader/page/PageReader.java | 52 +++++++++++----
 10 files changed, 248 insertions(+), 33 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java
index c107c1ef1..d65ca1722 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java
@@ -25,6 +25,7 @@ import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.read.reader.series.PaginationController;
 
 import java.util.Arrays;
+import java.util.function.LongConsumer;
 
 public class TsBlockUtil {
 
@@ -75,10 +76,32 @@ public class TsBlockUtil {
       TsBlockBuilder builder,
       Filter pushDownFilter,
       PaginationController paginationController) {
+    return applyFilterAndLimitOffsetToTsBlock(
+        unFilteredBlock, builder, pushDownFilter, paginationController, null);
+  }
+
+  public static TsBlock applyFilterAndLimitOffsetToTsBlock(
+      TsBlock unFilteredBlock,
+      TsBlockBuilder builder,
+      Filter pushDownFilter,
+      PaginationController paginationController,
+      LongConsumer filterRowsRecorder) {
+
     boolean[] selection = new boolean[unFilteredBlock.getPositionCount()];
     Arrays.fill(selection, true);
-    boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(selection, 
unFilteredBlock);
+    boolean[] keepCurrentRow =
+        filterRowsRecorder == null
+            ? pushDownFilter.satisfyTsBlock(selection, unFilteredBlock)
+            : pushDownFilter.satisfyTsBlock(selection, unFilteredBlock, 
filterRowsRecorder);
 
+    return buildFilteredTsBlock(unFilteredBlock, builder, keepCurrentRow, 
paginationController);
+  }
+
+  private static TsBlock buildFilteredTsBlock(
+      TsBlock unFilteredBlock,
+      TsBlockBuilder builder,
+      boolean[] keepCurrentRow,
+      PaginationController paginationController) {
     // construct time column
     int readEndIndex =
         buildTimeColumnWithPagination(
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java
index 5cac76db6..bebe3a4c3 100755
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java
@@ -43,6 +43,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.LongConsumer;
 
 /**
  * A Filter is an executable expression tree describing the criteria for which 
records to keep when
@@ -117,6 +118,26 @@ public abstract class Filter {
    */
   public abstract boolean[] satisfyTsBlock(boolean[] selection, TsBlock 
tsBlock);
 
+  public final boolean[] satisfyTsBlock(
+      boolean[] selection, TsBlock tsBlock, LongConsumer filterRowsRecorder) {
+
+    int inputCount = countSelectedRows(selection);
+    boolean[] result = satisfyTsBlock(selection, tsBlock);
+    int outputCount = countSelectedRows(result);
+    if (inputCount > outputCount) {
+      filterRowsRecorder.accept((inputCount - outputCount));
+    }
+
+    return result;
+  }
+
+  private static int countSelectedRows(boolean[] selection) {
+    if (selection == null) return 0;
+    int count = 0;
+    for (boolean b : selection) count += b ? 1 : 0;
+    return count;
+  }
+
   /**
    * To examine whether the block can be skipped.
    *
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java
index 1227fd012..68ce7ef32 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java
@@ -28,6 +28,7 @@ import 
org.apache.tsfile.read.reader.series.PaginationController;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.function.LongConsumer;
 
 public interface IPageReader extends IMetadata {
 
@@ -39,6 +40,18 @@ public interface IPageReader extends IMetadata {
 
   TsBlock getAllSatisfiedData() throws IOException;
 
+  /**
+   * Reads all rows from the page that satisfy the current filter, while also 
recording the number
+   * of rows filtered out.
+   *
+   * <p>This method behaves identically to {@link #getAllSatisfiedData()}, 
with the addition that it
+   * reports the count of rows that were discarded by the filter to the 
provided {@code
+   * filterRowsRecorder}.
+   *
+   * @param filterRowsRecorder, receives the number of rows filtered out;
+   */
+  TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws 
IOException;
+
   void addRecordFilter(Filter filter);
 
   // The 'modified' property is also true when a data type need to be modified 
in query and
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
index 22d743530..85073a456 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
@@ -39,6 +39,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.LongConsumer;
 
 public abstract class AbstractAlignedChunkReader extends AbstractChunkReader {
   // chunk header of the time column
@@ -59,9 +60,13 @@ public abstract class AbstractAlignedChunkReader extends 
AbstractChunkReader {
 
   @SuppressWarnings("unchecked")
   AbstractAlignedChunkReader(
-      Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter 
queryFilter)
+      Chunk timeChunk,
+      List<Chunk> valueChunkList,
+      long readStopTime,
+      Filter queryFilter,
+      LongConsumer filterRowsRecorder)
       throws IOException {
-    super(readStopTime, queryFilter);
+    super(readStopTime, queryFilter, filterRowsRecorder);
     this.timeChunkHeader = timeChunk.getHeader();
     this.timeChunkDataBuffer = timeChunk.getData();
     this.timeDeleteIntervalList = timeChunk.getDeleteIntervalList();
@@ -163,8 +168,14 @@ public abstract class AbstractAlignedChunkReader extends 
AbstractChunkReader {
   }
 
   protected boolean pageCanSkip(PageHeader pageHeader) {
-    return queryFilter != null
-        && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), 
pageHeader.getEndTime());
+    if (queryFilter != null
+        && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), 
pageHeader.getEndTime())) {
+      if (filterRowsRecorder != null) {
+        this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount());
+      }
+      return true;
+    }
+    return false;
   }
 
   private void skipCurrentPage(PageHeader timePageHeader, List<PageHeader> 
valuePageHeader) {
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java
index 627ace07e..caaa58751 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java
@@ -31,6 +31,7 @@ import org.apache.tsfile.read.reader.IPageReader;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.function.LongConsumer;
 
 public abstract class AbstractChunkReader implements IChunkReader {
 
@@ -44,11 +45,14 @@ public abstract class AbstractChunkReader implements 
IChunkReader {
   // any filter, no matter value filter or time filter
   protected final Filter queryFilter;
 
+  protected LongConsumer filterRowsRecorder;
+
   protected final List<IPageReader> pageReaderList = new LinkedList<>();
 
-  protected AbstractChunkReader(long readStopTime, Filter filter) {
+  protected AbstractChunkReader(long readStopTime, Filter filter, LongConsumer 
filterRowsRecorder) {
     this.readStopTime = readStopTime;
     this.queryFilter = filter;
+    this.filterRowsRecorder = filterRowsRecorder;
   }
 
   /** judge if has next page whose page header satisfies the filter. */
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
index e7879734c..a7a4723bd 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
@@ -32,22 +32,36 @@ import org.apache.tsfile.read.reader.page.LazyLoadPageData;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.LongConsumer;
 
 public class AlignedChunkReader extends AbstractAlignedChunkReader {
 
   public AlignedChunkReader(
-      Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter 
queryFilter)
+      Chunk timeChunk,
+      List<Chunk> valueChunkList,
+      long readStopTime,
+      Filter queryFilter,
+      LongConsumer filterRowsRecorder)
       throws IOException {
-    super(timeChunk, valueChunkList, readStopTime, queryFilter);
+    super(timeChunk, valueChunkList, readStopTime, queryFilter, 
filterRowsRecorder);
   }
 
   public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList) 
throws IOException {
-    this(timeChunk, valueChunkList, Long.MIN_VALUE, null);
+    this(timeChunk, valueChunkList, Long.MIN_VALUE, null, null);
   }
 
   public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, 
Filter queryFilter)
       throws IOException {
-    this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
+    this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null);
+  }
+
+  public AlignedChunkReader(
+      Chunk timeChunk,
+      List<Chunk> valueChunkList,
+      Filter queryFilter,
+      LongConsumer filterRowsRecorder)
+      throws IOException {
+    this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, 
filterRowsRecorder);
   }
 
   /**
@@ -56,7 +70,7 @@ public class AlignedChunkReader extends 
AbstractAlignedChunkReader {
    */
   public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, long 
readStopTime)
       throws IOException {
-    this(timeChunk, valueChunkList, readStopTime, null);
+    this(timeChunk, valueChunkList, readStopTime, null, null);
   }
 
   @Override
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
index 806a3b818..c2b2301d7 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
@@ -37,6 +37,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.LongConsumer;
 
 public class ChunkReader extends AbstractChunkReader {
 
@@ -47,8 +48,9 @@ public class ChunkReader extends AbstractChunkReader {
   private final EncryptParameter encryptParam;
 
   @SuppressWarnings("unchecked")
-  public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) {
-    super(readStopTime, queryFilter);
+  public ChunkReader(
+      Chunk chunk, long readStopTime, Filter queryFilter, LongConsumer 
filterRowsRecorder) {
+    super(readStopTime, queryFilter, filterRowsRecorder);
     this.chunkHeader = chunk.getHeader();
     this.chunkDataBuffer = chunk.getData();
     this.deleteIntervalList = chunk.getDeleteIntervalList();
@@ -57,11 +59,15 @@ public class ChunkReader extends AbstractChunkReader {
   }
 
   public ChunkReader(Chunk chunk) throws IOException {
-    this(chunk, Long.MIN_VALUE, null);
+    this(chunk, Long.MIN_VALUE, null, null);
   }
 
   public ChunkReader(Chunk chunk, Filter queryFilter) {
-    this(chunk, Long.MIN_VALUE, queryFilter);
+    this(chunk, Long.MIN_VALUE, queryFilter, null);
+  }
+
+  public ChunkReader(Chunk chunk, Filter queryFilter, LongConsumer 
filterRowsRecorder) {
+    this(chunk, Long.MIN_VALUE, queryFilter, filterRowsRecorder);
   }
 
   /**
@@ -69,7 +75,7 @@ public class ChunkReader extends AbstractChunkReader {
    * filtering out pages whose endTime is less than current timestamp.
    */
   public ChunkReader(Chunk chunk, long readStopTime) {
-    this(chunk, readStopTime, null);
+    this(chunk, readStopTime, null, null);
   }
 
   private void initAllPageReaders(Statistics<? extends Serializable> 
chunkStatistic) {
@@ -99,8 +105,14 @@ public class ChunkReader extends AbstractChunkReader {
   }
 
   private boolean pageCanSkip(PageHeader pageHeader) {
-    return queryFilter != null
-        && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), 
pageHeader.getEndTime());
+    if (queryFilter != null
+        && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), 
pageHeader.getEndTime())) {
+      if (filterRowsRecorder != null) {
+        this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount());
+      }
+      return true;
+    }
+    return false;
   }
 
   protected boolean pageDeleted(PageHeader pageHeader) {
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
index cc156e324..32bdbf1d2 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
@@ -32,20 +32,34 @@ import org.apache.tsfile.read.reader.page.TablePageReader;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.LongConsumer;
 
 // difference with AlignedChunkReader is that TableChunkReader works for 
TableScan and keep all null
 // rows
 public class TableChunkReader extends AbstractAlignedChunkReader {
 
   public TableChunkReader(
-      Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter 
queryFilter)
+      Chunk timeChunk,
+      List<Chunk> valueChunkList,
+      long readStopTime,
+      Filter queryFilter,
+      LongConsumer filterRowsRecorder)
       throws IOException {
-    super(timeChunk, valueChunkList, readStopTime, queryFilter);
+    super(timeChunk, valueChunkList, readStopTime, queryFilter, 
filterRowsRecorder);
   }
 
   public TableChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter 
queryFilter)
       throws IOException {
-    this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
+    this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null);
+  }
+
+  public TableChunkReader(
+      Chunk timeChunk,
+      List<Chunk> valueChunkList,
+      Filter queryFilter,
+      LongConsumer filterRowsRecorder)
+      throws IOException {
+    this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, 
filterRowsRecorder);
   }
 
   @Override
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
index b3090a575..2a95472b6 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
@@ -40,7 +40,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.LongConsumer;
 
+import static java.util.Objects.requireNonNull;
 import static 
org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
 
 public abstract class AbstractAlignedPageReader implements IPageReader {
@@ -215,6 +217,55 @@ public abstract class AbstractAlignedPageReader implements 
IPageReader {
         unFilteredBlock, builder, pushDownFilter, paginationController);
   }
 
+  /**
+   * get all satisfied data while record the number of filter rows. if one 
tuple is not satisfy by
+   * the filter and is deleted at the same time, the tuple cannot be 
considered as a filtered data.
+   */
+  @Override
+  public TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws 
IOException {
+    requireNonNull(filterRowsRecorder, "filterRowsRecorder is null");
+    long[] timeBatch = timePageReader.getNextTimeBatch();
+
+    if (allPageDataSatisfy()) {
+      buildResultWithoutAnyFilterAndDelete(timeBatch);
+      return builder.build();
+    }
+
+    long allFilteredRows = 0;
+    // if !filter.satisfy, discard this row
+    boolean[] keepCurrentRow = new boolean[timeBatch.length];
+    boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy();
+    if (globalTimeFilterAllSatisfy) {
+      Arrays.fill(keepCurrentRow, true);
+    } else {
+      // record the filtered rows number
+      long filteredRows =
+          
updateKeepCurrentRowThroughGlobalTimeFilterWithRecord(keepCurrentRow, 
timeBatch);
+      allFilteredRows += filteredRows;
+    }
+
+    if (timePageReader.isModified()) {
+      //  if one row is deleted, it can't be considered as the filtered row
+      long deletedAndFilteredRows =
+          updateKeepCurrentRowThroughDeletionWithRecord(keepCurrentRow, 
timeBatch);
+      allFilteredRows -= deletedAndFilteredRows;
+    }
+    if (allFilteredRows != 0) {
+      filterRowsRecorder.accept(allFilteredRows);
+    }
+    boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy();
+    constructResult(keepCurrentRow, timeBatch, pushDownFilterAllSatisfy);
+
+    TsBlock unFilteredBlock = builder.build();
+    if (pushDownFilterAllSatisfy) {
+      // OFFSET & LIMIT has been consumed in buildTimeColumn
+      return unFilteredBlock;
+    }
+    builder.reset();
+    return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+        unFilteredBlock, builder, pushDownFilter, paginationController, 
filterRowsRecorder);
+  }
+
   private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws 
IOException {
     if (paginationController.hasCurOffset(timeBatch.length)) {
       paginationController.consumeOffset(timeBatch.length);
@@ -264,6 +315,17 @@ public abstract class AbstractAlignedPageReader implements 
IPageReader {
     }
   }
 
+  private long updateKeepCurrentRowThroughGlobalTimeFilterWithRecord(
+      boolean[] keepCurrentRow, long[] timeBatch) {
+
+    long filteredRows = 0;
+    for (int i = 0, n = timeBatch.length; i < n; i++) {
+      keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null);
+      filteredRows += keepCurrentRow[i] ? 0 : 1;
+    }
+    return filteredRows;
+  }
+
   private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, 
long[] timeBatch) {
     for (int i = 0, n = timeBatch.length; i < n; i++) {
       if (keepCurrentRow[i]) {
@@ -272,6 +334,19 @@ public abstract class AbstractAlignedPageReader implements 
IPageReader {
     }
   }
 
+  private long updateKeepCurrentRowThroughDeletionWithRecord(
+      boolean[] keepCurrentRow, long[] timeBatch) {
+    long deletedAndFilteredRows = 0;
+    for (int i = 0, n = timeBatch.length; i < n; i++) {
+      if (keepCurrentRow[i]) {
+        keepCurrentRow[i] = !timePageReader.isDeleted(timeBatch[i]);
+      } else {
+        deletedAndFilteredRows += timePageReader.isDeleted(timeBatch[i]) ? 1 : 
0;
+      }
+    }
+    return deletedAndFilteredRows;
+  }
+
   protected int buildTimeColumn(
       long[] timeBatch, boolean[] keepCurrentRow, boolean 
pushDownFilterAllSatisfy) {
     if (pushDownFilterAllSatisfy) {
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
index 2576706b2..2db020cf8 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
@@ -44,6 +44,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.LongConsumer;
 
 import static 
org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
 import static org.apache.tsfile.utils.Preconditions.checkArgument;
@@ -212,6 +213,11 @@ public class PageReader implements IPageReader {
 
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
+    return getAllSatisfiedData(null);
+  }
+
+  @Override
+  public TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws 
IOException {
     uncompressDataIfNecessary();
     TsBlockBuilder builder;
     int initialExpectedEntries = (int) pageHeader.getStatistics().getCount();
@@ -223,14 +229,18 @@ public class PageReader implements IPageReader {
 
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
+    long allFilteredRows = 0;
     boolean allSatisfy = recordFilter == null || recordFilter.allSatisfy(this);
     switch (dataType) {
       case BOOLEAN:
         while (timeDecoder.hasNext(timeBuffer)) {
           long timestamp = timeDecoder.readLong(timeBuffer);
           boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
-          if (isDeleted(timestamp)
-              || (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, 
aBoolean))) {
+          if (isDeleted(timestamp)) {
+            continue;
+          }
+          if (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, 
aBoolean)) {
+            allFilteredRows++;
             continue;
           }
           if (paginationController.hasCurOffset()) {
@@ -252,8 +262,11 @@ public class PageReader implements IPageReader {
         while (timeDecoder.hasNext(timeBuffer)) {
           long timestamp = timeDecoder.readLong(timeBuffer);
           int anInt = valueDecoder.readInt(valueBuffer);
-          if (isDeleted(timestamp)
-              || (!allSatisfy && !recordFilter.satisfyInteger(timestamp, 
anInt))) {
+          if (isDeleted(timestamp)) {
+            continue;
+          }
+          if (!allSatisfy && !recordFilter.satisfyInteger(timestamp, anInt)) {
+            allFilteredRows++;
             continue;
           }
           if (paginationController.hasCurOffset()) {
@@ -275,8 +288,11 @@ public class PageReader implements IPageReader {
         while (timeDecoder.hasNext(timeBuffer)) {
           long timestamp = timeDecoder.readLong(timeBuffer);
           long aLong = valueDecoder.readLong(valueBuffer);
-          if (isDeleted(timestamp)
-              || (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong))) 
{
+          if (isDeleted(timestamp)) {
+            continue;
+          }
+          if (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong)) {
+            allFilteredRows++;
             continue;
           }
           if (paginationController.hasCurOffset()) {
@@ -297,8 +313,11 @@ public class PageReader implements IPageReader {
         while (timeDecoder.hasNext(timeBuffer)) {
           long timestamp = timeDecoder.readLong(timeBuffer);
           float aFloat = valueDecoder.readFloat(valueBuffer);
-          if (isDeleted(timestamp)
-              || (!allSatisfy && !recordFilter.satisfyFloat(timestamp, 
aFloat))) {
+          if (isDeleted(timestamp)) {
+            continue;
+          }
+          if (!allSatisfy && !recordFilter.satisfyFloat(timestamp, aFloat)) {
+            allFilteredRows++;
             continue;
           }
           if (paginationController.hasCurOffset()) {
@@ -319,8 +338,11 @@ public class PageReader implements IPageReader {
         while (timeDecoder.hasNext(timeBuffer)) {
           long timestamp = timeDecoder.readLong(timeBuffer);
           double aDouble = valueDecoder.readDouble(valueBuffer);
-          if (isDeleted(timestamp)
-              || (!allSatisfy && !recordFilter.satisfyDouble(timestamp, 
aDouble))) {
+          if (isDeleted(timestamp)) {
+            continue;
+          }
+          if (!allSatisfy && !recordFilter.satisfyDouble(timestamp, aDouble)) {
+            allFilteredRows++;
             continue;
           }
           if (paginationController.hasCurOffset()) {
@@ -344,8 +366,11 @@ public class PageReader implements IPageReader {
         while (timeDecoder.hasNext(timeBuffer)) {
           long timestamp = timeDecoder.readLong(timeBuffer);
           Binary aBinary = valueDecoder.readBinary(valueBuffer);
-          if (isDeleted(timestamp)
-              || (!allSatisfy && !recordFilter.satisfyBinary(timestamp, 
aBinary))) {
+          if (isDeleted(timestamp)) {
+            continue;
+          }
+          if (!allSatisfy && !recordFilter.satisfyBinary(timestamp, aBinary)) {
+            allFilteredRows++;
             continue;
           }
           if (paginationController.hasCurOffset()) {
@@ -365,6 +390,9 @@ public class PageReader implements IPageReader {
       default:
         throw new UnSupportedDataTypeException(String.valueOf(dataType));
     }
+    if (filterRowsRecorder != null && allFilteredRows > 0) {
+      filterRowsRecorder.accept(allFilteredRows);
+    }
     return builder.build();
   }
 

Reply via email to