This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/ScanTimeSlice in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 26825cd234ef6169ca44805fc28346943450f6b1 Author: JackieTien97 <[email protected]> AuthorDate: Mon Jun 17 17:54:57 2024 +0800 Optimize the time slice control of SeriesScanOperator and AlignedSeriesScanOperator --- ...erator.java => AbstractSeriesScanOperator.java} | 123 +++++++-------------- .../operator/source/AlignedSeriesScanOperator.java | 110 +----------------- .../operator/source/SeriesScanOperator.java | 111 +------------------ 3 files changed, 44 insertions(+), 300 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java similarity index 51% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java index dc8b3c877a5..d8c07d75caf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java @@ -19,56 +19,36 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; -import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; - -import org.apache.tsfile.block.column.Column; -import org.apache.tsfile.block.column.ColumnBuilder; -import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.column.TimeColumn; -import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; -import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; -public class SeriesScanOperator extends AbstractDataSourceOperator { - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(SeriesScanOperator.class); +public abstract class AbstractSeriesScanOperator extends AbstractDataSourceOperator { private boolean finished = false; - public SeriesScanOperator( - OperatorContext context, - PlanNodeId sourceId, - PartialPath seriesPath, - Ordering scanOrder, - SeriesScanOptions seriesScanOptions) { - this.sourceId = sourceId; - this.operatorContext = context; - this.seriesScanUtil = - new SeriesScanUtil(seriesPath, scanOrder, seriesScanOptions, context.getInstanceContext()); - this.maxReturnSize = - Math.min(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); - } - @Override public TsBlock next() throws Exception { if (retainedTsBlock != null) { return getResultFromRetainedTsBlock(); } + // we don't get any data in current batch time slice, just return null + if (resultTsBlockBuilder.isEmpty()) { + return null; + } resultTsBlock = resultTsBlockBuilder.build(); resultTsBlockBuilder.reset(); return checkTsBlockSizeAndGetResult(); } + @Override + public boolean isFinished() throws Exception { + return finished; + } + @SuppressWarnings("squid:S112") @Override public boolean hasNext() throws Exception { @@ -81,6 +61,8 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long start = System.nanoTime(); + boolean noMoreData = false; + // here use do-while to promise doing this at least once do { /* @@ -89,11 +71,15 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { * 3. consume next file finally */ if (!readPageData() && !readChunkData() && !readFileData()) { + noMoreData = true; break; } - } while (System.nanoTime() - start < maxRuntime && !resultTsBlockBuilder.isFull()); - finished = resultTsBlockBuilder.isEmpty(); + } while (System.nanoTime() - start < maxRuntime + && !resultTsBlockBuilder.isFull() + && retainedTsBlock == null); + + finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null && noMoreData); return !finished; } catch (IOException e) { @@ -101,27 +87,6 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { } } - @Override - public boolean isFinished() throws Exception { - return finished; - } - - @Override - public long calculateMaxPeekMemory() { - return Math.max( - maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L); - } - - @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize(); - } - private boolean readFileData() throws IOException { while (seriesScanUtil.hasNextFile()) { if (readChunkData()) { @@ -141,45 +106,30 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { } private boolean readPageData() throws IOException { - while (seriesScanUtil.hasNextPage()) { + if (seriesScanUtil.hasNextPage()) { TsBlock tsBlock = seriesScanUtil.nextPage(); - if (!isEmpty(tsBlock)) { appendToBuilder(tsBlock); - return true; } + return true; } return false; } + private boolean isEmpty(TsBlock tsBlock) { + return tsBlock == null || tsBlock.isEmpty(); + } + private void appendToBuilder(TsBlock tsBlock) { - TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); - TimeColumn timeColumn = tsBlock.getTimeColumn(); - ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0); - Column column = tsBlock.getColumn(0); - - if (column.mayHaveNull()) { - for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { - timeColumnBuilder.writeLong(timeColumn.getLong(i)); - if (column.isNull(i)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write(column, i); - } - resultTsBlockBuilder.declarePosition(); - } - } else { - for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { - timeColumnBuilder.writeLong(timeColumn.getLong(i)); - columnBuilder.write(column, i); - resultTsBlockBuilder.declarePosition(); - } + int size = tsBlock.getPositionCount(); + if (resultTsBlockBuilder.isEmpty() && size >= resultTsBlockBuilder.getMaxTsBlockLineNumber()) { + retainedTsBlock = tsBlock; + return; } + buildResult(tsBlock); } - private boolean isEmpty(TsBlock tsBlock) { - return tsBlock == null || tsBlock.isEmpty(); - } + protected abstract void buildResult(TsBlock tsBlock); @Override protected List<TSDataType> getResultDataTypes() { @@ -187,11 +137,12 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { } @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) - + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()); + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java index 22cb94b9ec1..95ea59abac4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java @@ -38,16 +38,13 @@ import org.apache.tsfile.read.common.block.column.TimeColumn; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.RamUsageEstimator; -import java.io.IOException; import java.util.List; -import java.util.concurrent.TimeUnit; -public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { +public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesScanOperator.class); private final int valueColumnCount; - private boolean finished = false; private int maxTsBlockLineNum = -1; public AlignedSeriesScanOperator( @@ -78,56 +75,6 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { this.maxTsBlockLineNum = maxTsBlockLineNum; } - @Override - public TsBlock next() throws Exception { - if (retainedTsBlock != null) { - return getResultFromRetainedTsBlock(); - } - resultTsBlock = resultTsBlockBuilder.build(); - resultTsBlockBuilder.reset(); - return checkTsBlockSizeAndGetResult(); - } - - @SuppressWarnings("squid:S112") - @Override - public boolean hasNext() throws Exception { - if (retainedTsBlock != null) { - return true; - } - try { - - // start stopwatch - long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); - long start = System.nanoTime(); - - // here use do-while to promise doing this at least once - do { - /* - * 1. consume page data firstly - * 2. consume chunk data secondly - * 3. consume next file finally - */ - if (!readPageData() && !readChunkData() && !readFileData()) { - break; - } - - } while (System.nanoTime() - start < maxRuntime - && !resultTsBlockBuilder.isFull() - && retainedTsBlock == null); - - finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null); - - return !finished; - } catch (IOException e) { - throw new RuntimeException("Error happened while scanning the file", e); - } - } - - @Override - public boolean isFinished() throws Exception { - return finished; - } - @Override public long calculateMaxPeekMemory() { return Math.max( @@ -138,51 +85,8 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { } @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize(); - } - - private boolean readFileData() throws IOException { - while (seriesScanUtil.hasNextFile()) { - if (readChunkData()) { - return true; - } - } - return false; - } - - private boolean readChunkData() throws IOException { - while (seriesScanUtil.hasNextChunk()) { - if (readPageData()) { - return true; - } - } - return false; - } - - private boolean readPageData() throws IOException { - while (seriesScanUtil.hasNextPage()) { - TsBlock tsBlock = seriesScanUtil.nextPage(); - if (!isEmpty(tsBlock)) { - appendToBuilder(tsBlock); - return true; - } - } - return false; - } - - private void appendToBuilder(TsBlock tsBlock) { + protected void buildResult(TsBlock tsBlock) { int size = tsBlock.getPositionCount(); - if (resultTsBlockBuilder.isEmpty() - && tsBlock.getPositionCount() >= resultTsBlockBuilder.getMaxTsBlockLineNumber()) { - retainedTsBlock = tsBlock; - return; - } TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); TimeColumn timeColumn = tsBlock.getTimeColumn(); for (int i = 0; i < size; i++) { @@ -214,16 +118,6 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { } } - private boolean isEmpty(TsBlock tsBlock) { - return tsBlock == null || tsBlock.isEmpty(); - } - - @Override - protected List<TSDataType> getResultDataTypes() { - // time + all value columns - return seriesScanUtil.getTsDataTypeList(); - } - @Override public void initQueryDataSource(IQueryDataSource dataSource) { seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java index dc8b3c877a5..d4ce1caa624 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java @@ -29,22 +29,15 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileDescriptor; -import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.TimeColumn; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.RamUsageEstimator; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class SeriesScanOperator extends AbstractDataSourceOperator { +public class SeriesScanOperator extends AbstractSeriesScanOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(SeriesScanOperator.class); - private boolean finished = false; - public SeriesScanOperator( OperatorContext context, PlanNodeId sourceId, @@ -59,53 +52,6 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { Math.min(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); } - @Override - public TsBlock next() throws Exception { - if (retainedTsBlock != null) { - return getResultFromRetainedTsBlock(); - } - resultTsBlock = resultTsBlockBuilder.build(); - resultTsBlockBuilder.reset(); - return checkTsBlockSizeAndGetResult(); - } - - @SuppressWarnings("squid:S112") - @Override - public boolean hasNext() throws Exception { - if (retainedTsBlock != null) { - return true; - } - try { - - // start stopwatch - long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); - long start = System.nanoTime(); - - // here use do-while to promise doing this at least once - do { - /* - * 1. consume page data firstly - * 2. consume chunk data secondly - * 3. consume next file finally - */ - if (!readPageData() && !readChunkData() && !readFileData()) { - break; - } - } while (System.nanoTime() - start < maxRuntime && !resultTsBlockBuilder.isFull()); - - finished = resultTsBlockBuilder.isEmpty(); - - return !finished; - } catch (IOException e) { - throw new RuntimeException("Error happened while scanning the file", e); - } - } - - @Override - public boolean isFinished() throws Exception { - return finished; - } - @Override public long calculateMaxPeekMemory() { return Math.max( @@ -113,53 +59,15 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { } @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize(); - } - - private boolean readFileData() throws IOException { - while (seriesScanUtil.hasNextFile()) { - if (readChunkData()) { - return true; - } - } - return false; - } - - private boolean readChunkData() throws IOException { - while (seriesScanUtil.hasNextChunk()) { - if (readPageData()) { - return true; - } - } - return false; - } - - private boolean readPageData() throws IOException { - while (seriesScanUtil.hasNextPage()) { - TsBlock tsBlock = seriesScanUtil.nextPage(); - - if (!isEmpty(tsBlock)) { - appendToBuilder(tsBlock); - return true; - } - } - return false; - } - - private void appendToBuilder(TsBlock tsBlock) { + protected void buildResult(TsBlock tsBlock) { + int size = tsBlock.getPositionCount(); TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); TimeColumn timeColumn = tsBlock.getTimeColumn(); ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0); Column column = tsBlock.getColumn(0); if (column.mayHaveNull()) { - for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { + for (int i = 0; i < size; i++) { timeColumnBuilder.writeLong(timeColumn.getLong(i)); if (column.isNull(i)) { columnBuilder.appendNull(); @@ -169,7 +77,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { resultTsBlockBuilder.declarePosition(); } } else { - for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { + for (int i = 0; i < size; i++) { timeColumnBuilder.writeLong(timeColumn.getLong(i)); columnBuilder.write(column, i); resultTsBlockBuilder.declarePosition(); @@ -177,15 +85,6 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { } } - private boolean isEmpty(TsBlock tsBlock) { - return tsBlock == null || tsBlock.isEmpty(); - } - - @Override - protected List<TSDataType> getResultDataTypes() { - return seriesScanUtil.getTsDataTypeList(); - } - @Override public long ramBytesUsed() { return INSTANCE_SIZE
