This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/ScanTimeSlice
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 26825cd234ef6169ca44805fc28346943450f6b1
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jun 17 17:54:57 2024 +0800

    Optimize the time slice control of SeriesScanOperator and 
AlignedSeriesScanOperator
---
 ...erator.java => AbstractSeriesScanOperator.java} | 123 +++++++--------------
 .../operator/source/AlignedSeriesScanOperator.java | 110 +-----------------
 .../operator/source/SeriesScanOperator.java        | 111 +------------------
 3 files changed, 44 insertions(+), 300 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
similarity index 51%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
index dc8b3c877a5..d8c07d75caf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
@@ -19,56 +19,36 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.source;
 
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
-import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
-import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
-import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-public class SeriesScanOperator extends AbstractDataSourceOperator {
-  private static final long INSTANCE_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(SeriesScanOperator.class);
+public abstract class AbstractSeriesScanOperator extends 
AbstractDataSourceOperator {
 
   private boolean finished = false;
 
-  public SeriesScanOperator(
-      OperatorContext context,
-      PlanNodeId sourceId,
-      PartialPath seriesPath,
-      Ordering scanOrder,
-      SeriesScanOptions seriesScanOptions) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
-        new SeriesScanUtil(seriesPath, scanOrder, seriesScanOptions, 
context.getInstanceContext());
-    this.maxReturnSize =
-        Math.min(maxReturnSize, 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
-  }
-
   @Override
   public TsBlock next() throws Exception {
     if (retainedTsBlock != null) {
       return getResultFromRetainedTsBlock();
     }
+    // we don't get any data in current batch time slice, just return null
+    if (resultTsBlockBuilder.isEmpty()) {
+      return null;
+    }
     resultTsBlock = resultTsBlockBuilder.build();
     resultTsBlockBuilder.reset();
     return checkTsBlockSizeAndGetResult();
   }
 
+  @Override
+  public boolean isFinished() throws Exception {
+    return finished;
+  }
+
   @SuppressWarnings("squid:S112")
   @Override
   public boolean hasNext() throws Exception {
@@ -81,6 +61,8 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
       long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
       long start = System.nanoTime();
 
+      boolean noMoreData = false;
+
       // here use do-while to promise doing this at least once
       do {
         /*
@@ -89,11 +71,15 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
          * 3. consume next file finally
          */
         if (!readPageData() && !readChunkData() && !readFileData()) {
+          noMoreData = true;
           break;
         }
-      } while (System.nanoTime() - start < maxRuntime && 
!resultTsBlockBuilder.isFull());
 
-      finished = resultTsBlockBuilder.isEmpty();
+      } while (System.nanoTime() - start < maxRuntime
+          && !resultTsBlockBuilder.isFull()
+          && retainedTsBlock == null);
+
+      finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null && 
noMoreData);
 
       return !finished;
     } catch (IOException e) {
@@ -101,27 +87,6 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
     }
   }
 
-  @Override
-  public boolean isFinished() throws Exception {
-    return finished;
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return Math.max(
-        maxReturnSize, 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
-  }
-
   private boolean readFileData() throws IOException {
     while (seriesScanUtil.hasNextFile()) {
       if (readChunkData()) {
@@ -141,45 +106,30 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
   }
 
   private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
+    if (seriesScanUtil.hasNextPage()) {
       TsBlock tsBlock = seriesScanUtil.nextPage();
-
       if (!isEmpty(tsBlock)) {
         appendToBuilder(tsBlock);
-        return true;
       }
+      return true;
     }
     return false;
   }
 
+  private boolean isEmpty(TsBlock tsBlock) {
+    return tsBlock == null || tsBlock.isEmpty();
+  }
+
   private void appendToBuilder(TsBlock tsBlock) {
-    TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
-    TimeColumn timeColumn = tsBlock.getTimeColumn();
-    ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0);
-    Column column = tsBlock.getColumn(0);
-
-    if (column.mayHaveNull()) {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
-        timeColumnBuilder.writeLong(timeColumn.getLong(i));
-        if (column.isNull(i)) {
-          columnBuilder.appendNull();
-        } else {
-          columnBuilder.write(column, i);
-        }
-        resultTsBlockBuilder.declarePosition();
-      }
-    } else {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
-        timeColumnBuilder.writeLong(timeColumn.getLong(i));
-        columnBuilder.write(column, i);
-        resultTsBlockBuilder.declarePosition();
-      }
+    int size = tsBlock.getPositionCount();
+    if (resultTsBlockBuilder.isEmpty() && size >= 
resultTsBlockBuilder.getMaxTsBlockLineNumber()) {
+      retainedTsBlock = tsBlock;
+      return;
     }
+    buildResult(tsBlock);
   }
 
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
+  protected abstract void buildResult(TsBlock tsBlock);
 
   @Override
   protected List<TSDataType> getResultDataTypes() {
@@ -187,11 +137,12 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
   }
 
   @Override
-  public long ramBytesUsed() {
-    return INSTANCE_SIZE
-        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
-        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
-        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
-        + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes());
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
index 22cb94b9ec1..95ea59abac4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
@@ -38,16 +38,13 @@ import 
org.apache.tsfile.read.common.block.column.TimeColumn;
 import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
-public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesScanOperator.class);
 
   private final int valueColumnCount;
-  private boolean finished = false;
   private int maxTsBlockLineNum = -1;
 
   public AlignedSeriesScanOperator(
@@ -78,56 +75,6 @@ public class AlignedSeriesScanOperator extends 
AbstractDataSourceOperator {
     this.maxTsBlockLineNum = maxTsBlockLineNum;
   }
 
-  @Override
-  public TsBlock next() throws Exception {
-    if (retainedTsBlock != null) {
-      return getResultFromRetainedTsBlock();
-    }
-    resultTsBlock = resultTsBlockBuilder.build();
-    resultTsBlockBuilder.reset();
-    return checkTsBlockSizeAndGetResult();
-  }
-
-  @SuppressWarnings("squid:S112")
-  @Override
-  public boolean hasNext() throws Exception {
-    if (retainedTsBlock != null) {
-      return true;
-    }
-    try {
-
-      // start stopwatch
-      long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-      long start = System.nanoTime();
-
-      // here use do-while to promise doing this at least once
-      do {
-        /*
-         * 1. consume page data firstly
-         * 2. consume chunk data secondly
-         * 3. consume next file finally
-         */
-        if (!readPageData() && !readChunkData() && !readFileData()) {
-          break;
-        }
-
-      } while (System.nanoTime() - start < maxRuntime
-          && !resultTsBlockBuilder.isFull()
-          && retainedTsBlock == null);
-
-      finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null);
-
-      return !finished;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() throws Exception {
-    return finished;
-  }
-
   @Override
   public long calculateMaxPeekMemory() {
     return Math.max(
@@ -138,51 +85,8 @@ public class AlignedSeriesScanOperator extends 
AbstractDataSourceOperator {
   }
 
   @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
-  }
-
-  private boolean readFileData() throws IOException {
-    while (seriesScanUtil.hasNextFile()) {
-      if (readChunkData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        appendToBuilder(tsBlock);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void appendToBuilder(TsBlock tsBlock) {
+  protected void buildResult(TsBlock tsBlock) {
     int size = tsBlock.getPositionCount();
-    if (resultTsBlockBuilder.isEmpty()
-        && tsBlock.getPositionCount() >= 
resultTsBlockBuilder.getMaxTsBlockLineNumber()) {
-      retainedTsBlock = tsBlock;
-      return;
-    }
     TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
     TimeColumn timeColumn = tsBlock.getTimeColumn();
     for (int i = 0; i < size; i++) {
@@ -214,16 +118,6 @@ public class AlignedSeriesScanOperator extends 
AbstractDataSourceOperator {
     }
   }
 
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  protected List<TSDataType> getResultDataTypes() {
-    // time + all value columns
-    return seriesScanUtil.getTsDataTypeList();
-  }
-
   @Override
   public void initQueryDataSource(IQueryDataSource dataSource) {
     seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
index dc8b3c877a5..d4ce1caa624 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
@@ -29,22 +29,15 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
-import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.column.TimeColumn;
 import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class SeriesScanOperator extends AbstractDataSourceOperator {
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(SeriesScanOperator.class);
 
-  private boolean finished = false;
-
   public SeriesScanOperator(
       OperatorContext context,
       PlanNodeId sourceId,
@@ -59,53 +52,6 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
         Math.min(maxReturnSize, 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
   }
 
-  @Override
-  public TsBlock next() throws Exception {
-    if (retainedTsBlock != null) {
-      return getResultFromRetainedTsBlock();
-    }
-    resultTsBlock = resultTsBlockBuilder.build();
-    resultTsBlockBuilder.reset();
-    return checkTsBlockSizeAndGetResult();
-  }
-
-  @SuppressWarnings("squid:S112")
-  @Override
-  public boolean hasNext() throws Exception {
-    if (retainedTsBlock != null) {
-      return true;
-    }
-    try {
-
-      // start stopwatch
-      long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-      long start = System.nanoTime();
-
-      // here use do-while to promise doing this at least once
-      do {
-        /*
-         * 1. consume page data firstly
-         * 2. consume chunk data secondly
-         * 3. consume next file finally
-         */
-        if (!readPageData() && !readChunkData() && !readFileData()) {
-          break;
-        }
-      } while (System.nanoTime() - start < maxRuntime && 
!resultTsBlockBuilder.isFull());
-
-      finished = resultTsBlockBuilder.isEmpty();
-
-      return !finished;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() throws Exception {
-    return finished;
-  }
-
   @Override
   public long calculateMaxPeekMemory() {
     return Math.max(
@@ -113,53 +59,15 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
   }
 
   @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
-  }
-
-  private boolean readFileData() throws IOException {
-    while (seriesScanUtil.hasNextFile()) {
-      if (readChunkData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-
-      if (!isEmpty(tsBlock)) {
-        appendToBuilder(tsBlock);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void appendToBuilder(TsBlock tsBlock) {
+  protected void buildResult(TsBlock tsBlock) {
+    int size = tsBlock.getPositionCount();
     TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
     TimeColumn timeColumn = tsBlock.getTimeColumn();
     ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0);
     Column column = tsBlock.getColumn(0);
 
     if (column.mayHaveNull()) {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+      for (int i = 0; i < size; i++) {
         timeColumnBuilder.writeLong(timeColumn.getLong(i));
         if (column.isNull(i)) {
           columnBuilder.appendNull();
@@ -169,7 +77,7 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
         resultTsBlockBuilder.declarePosition();
       }
     } else {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+      for (int i = 0; i < size; i++) {
         timeColumnBuilder.writeLong(timeColumn.getLong(i));
         columnBuilder.write(column, i);
         resultTsBlockBuilder.declarePosition();
@@ -177,15 +85,6 @@ public class SeriesScanOperator extends 
AbstractDataSourceOperator {
     }
   }
 
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  protected List<TSDataType> getResultDataTypes() {
-    return seriesScanUtil.getTsDataTypeList();
-  }
-
   @Override
   public long ramBytesUsed() {
     return INSTANCE_SIZE

Reply via email to