This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty-mpp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6e76889ec8c7e1d89f7525e6d2f24202ec6e2d44 Author: JackieTien97 <[email protected]> AuthorDate: Wed Mar 30 21:58:07 2022 +0800 add driver which implements ExecFragmentInstance interface --- .../storagegroup/VirtualStorageGroupProcessor.java | 31 +++ .../org/apache/iotdb/db/mpp/execution/Driver.java | 310 +++++++++++++++++++++ .../iotdb/db/mpp/execution/DriverContext.java | 59 ++++ .../db/mpp/execution/FragmentInstanceContext.java | 4 + .../org/apache/iotdb/db/mpp/operator/Operator.java | 5 + .../db/mpp/operator/process/AggregateOperator.java | 5 + .../mpp/operator/process/DeviceMergeOperator.java | 5 + .../db/mpp/operator/process/FillOperator.java | 5 + .../mpp/operator/process/FilterNullOperator.java | 5 + .../mpp/operator/process/GroupByLevelOperator.java | 5 + .../db/mpp/operator/process/LimitOperator.java | 5 + .../db/mpp/operator/process/OffsetOperator.java | 5 + .../db/mpp/operator/process/SortOperator.java | 5 + .../db/mpp/operator/process/TimeJoinOperator.java | 21 ++ .../db/mpp/operator/sink/FragmentSinkOperator.java | 5 + .../mpp/operator/source/AlignedSeriesScanUtil.java | 5 +- .../source/SeriesAggregateScanOperator.java | 9 + .../db/mpp/operator/source/SeriesScanOperator.java | 22 +- .../db/mpp/operator/source/SeriesScanUtil.java | 25 +- .../db/mpp/operator/source/SourceOperator.java | 3 + .../db/mpp/sql/planner/LocalExecutionPlanner.java | 4 - .../iotdb/db/mpp/operator/LimitOperatorTest.java | 7 +- .../db/mpp/operator/SeriesScanOperatorTest.java | 5 +- .../db/mpp/operator/TimeJoinOperatorTest.java | 7 +- 24 files changed, 531 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java index 87d7b4e..a55ae80 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java @@ -1727,6 +1727,37 @@ public class VirtualStorageGroupProcessor { } } + /** used for mpp */ + public QueryDataSource query( + List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter) + throws QueryProcessException { + try { + List<TsFileResource> seqResources = + getFileResourceListForQuery( + tsFileManager.getTsFileList(true), + upgradeSeqFileList, + pathList, + singleDeviceId, + context, + timeFilter, + true); + List<TsFileResource> unseqResources = + getFileResourceListForQuery( + tsFileManager.getTsFileList(false), + upgradeUnseqFileList, + pathList, + singleDeviceId, + context, + timeFilter, + false); + QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources); + dataSource.setDataTTL(dataTTL); + return dataSource; + } catch (MetadataException e) { + throw new QueryProcessException(e); + } + } + /** lock the read lock of the insert lock */ public void readLock() { // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java new file mode 100644 index 0000000..91623a4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.idtable.IDTable; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.mpp.buffer.ISinkHandle; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.operator.Operator; +import org.apache.iotdb.db.mpp.operator.source.SourceOperator; +import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.airlift.units.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED; + +@NotThreadSafe +public class Driver implements ExecFragmentInstance { + + private static final Logger logger = LoggerFactory.getLogger(Driver.class); + + private final Operator root; + private final ISinkHandle sinkHandle; + private final DriverContext driverContext; + + private boolean init; + private boolean closed; + + /** closed tsfile used in this fragment instance */ + private Set<TsFileResource> closedFilePaths; + /** unClosed tsfile used in this fragment instance */ + private Set<TsFileResource> unClosedFilePaths; + + private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>(); + + public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) { + this.root = root; + this.sinkHandle = sinkHandle; + this.driverContext = driverContext; + } + + @Override + public boolean isFinished() { + if (closed) { + return true; + } + try { + return root != null && root.isFinished(); + } catch (Throwable t) { + logger.error("Failed to query whether the driver {} is finished", driverContext.getId(), t); + close(); + return true; + } + } + + @Override + public ListenableFuture<Void> processFor(Duration duration) { + + // initialization may be time-consuming, so we keep it in the processFor method + // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a + // critical bug + if (!init) { + try { + initialize(); + } catch (Throwable t) { + logger.error( + "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t); + close(); + return NOT_BLOCKED; + } + } + + // if the driver is blocked we don't need to continue + SettableFuture<Void> blockedFuture = driverBlockedFuture.get(); + if (!blockedFuture.isDone()) { + return blockedFuture; + } + + long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS); + + long start = System.nanoTime(); + try { + do { + ListenableFuture<Void> future = processInternal(); + if (!future.isDone()) { + return updateDriverBlockedFuture(future); + } + } while (System.nanoTime() - start < maxRuntime && !root.isFinished()); + } catch (Throwable t) { + logger.error("Failed to execute fragment instance {}", driverContext.getId(), t); + close(); + } + return NOT_BLOCKED; + } + + @Override + public FragmentInstanceId getInfo() { + return driverContext.getId(); + } + + @Override + public void close() { + Operator root; + ISinkHandle sinkHandle; + synchronized (this) { + closed = true; + root = this.root; + sinkHandle = this.sinkHandle; + } + + try { + if (root != null) { + root.close(); + } + if (sinkHandle != null) { + sinkHandle.close(); + } + } catch (Throwable t) { + logger.error("Failed to closed driver {}", driverContext.getId(), t); + } finally { + removeUsedFilesForQuery(); + } + } + + /** + * init seq file list and unseq file list in QueryDataSource and set it into each SourceNode TODO + * we should change all the blocked lock operation into tryLock + */ + private void initialize() throws QueryProcessException { + List<SourceOperator> sourceOperators = driverContext.getSourceOperators(); + if (sourceOperators != null && !sourceOperators.isEmpty()) { + QueryDataSource dataSource = initQueryDataSourceCache(); + sourceOperators.forEach( + sourceOperator -> { + // construct QueryDataSource for source operator + QueryDataSource queryDataSource = + new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources()); + + queryDataSource.setDataTTL(dataSource.getDataTTL()); + + sourceOperator.initQueryDataSource(queryDataSource); + }); + } + + this.init = true; + } + + /** + * The method is called in mergeLock() when executing query. This method will get all the + * QueryDataSource needed for this query + */ + public QueryDataSource initQueryDataSourceCache() throws QueryProcessException { + VirtualStorageGroupProcessor dataRegion = driverContext.getDataRegion(); + dataRegion.readLock(); + try { + List<PartialPath> pathList = + driverContext.getPaths().stream() + .map(IDTable::translateQueryPath) + .collect(Collectors.toList()); + // when all the selected series are under the same device, the QueryDataSource will be + // filtered according to timeIndex + Set<String> selectedDeviceIdSet = + pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet()); + + QueryDataSource dataSource = + dataRegion.query( + pathList, + selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null, + driverContext.getFragmentInstanceContext(), + driverContext.getTimeFilter()); + + // used files should be added before mergeLock is unlocked, or they may be deleted by + // running merge + addUsedFilesForQuery(dataSource); + + return dataSource; + } finally { + driverContext.getDataRegion().readUnlock(); + } + } + + /** Add the unique file paths to closeddFilePathsMap and unClosedFilePathsMap. */ + private void addUsedFilesForQuery(QueryDataSource dataSource) { + + // sequence data + addUsedFilesForQuery(dataSource.getSeqResources()); + + // unsequence data + addUsedFilesForQuery(dataSource.getUnseqResources()); + } + + private void addUsedFilesForQuery(List<TsFileResource> resources) { + Iterator<TsFileResource> iterator = resources.iterator(); + while (iterator.hasNext()) { + TsFileResource tsFileResource = iterator.next(); + boolean isClosed = tsFileResource.isClosed(); + addFilePathToMap(tsFileResource, isClosed); + + // this file may be deleted just before we lock it + if (tsFileResource.isDeleted()) { + Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths; + // This resource may be removed by other threads of this query. + if (pathSet.remove(tsFileResource)) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed); + } + iterator.remove(); + } + } + } + + /** + * All file paths used by this fragment instance must be cleared and thus the usage reference must + * be decreased. + */ + private void removeUsedFilesForQuery() { + for (TsFileResource tsFile : closedFilePaths) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true); + } + closedFilePaths = null; + for (TsFileResource tsFile : unClosedFilePaths) { + FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true); + } + unClosedFilePaths = null; + } + + /** + * Increase the usage reference of filePath of job id. Before the invoking of this method, <code> + * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code> + * sealedFilePathsMap.get(queryId)</code> or <code>unsealedFilePathsMap.get(queryId)</code> must + * not return null. + */ + void addFilePathToMap(TsFileResource tsFile, boolean isClosed) { + Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths; + if (!pathSet.contains(tsFile)) { + pathSet.add(tsFile); + FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed); + } + } + + private ListenableFuture<Void> processInternal() throws IOException { + ListenableFuture<Void> blocked = root.isBlocked(); + if (!blocked.isDone()) { + return blocked; + } + blocked = sinkHandle.isFull(); + if (!blocked.isDone()) { + return blocked; + } + if (root.hasNext()) { + TsBlock tsBlock = root.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + sinkHandle.send(tsBlock); + } + } + return NOT_BLOCKED; + } + + private ListenableFuture<Void> updateDriverBlockedFuture( + ListenableFuture<Void> sourceBlockedFuture) { + // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed + // or any of the operators gets a memory revocation request + SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create(); + driverBlockedFuture.set(newDriverBlockedFuture); + sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor()); + + // TODO Although we don't have memory management for operator now, we should consider it for + // future + // it's possible that memory revoking is requested for some operator + // before we update driverBlockedFuture above and we don't want to miss that + // notification, so we check to see whether that's the case before returning. + + return newDriverBlockedFuture; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java new file mode 100644 index 0000000..aa3a265 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution; + +import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.operator.source.SourceOperator; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import java.util.List; + +public class DriverContext { + private FragmentInstanceContext fragmentInstanceContext; + private List<PartialPath> paths; + private Filter timeFilter; + private VirtualStorageGroupProcessor dataRegion; + private List<SourceOperator> sourceOperators; + + public FragmentInstanceId getId() { + return fragmentInstanceContext.getId(); + } + + public FragmentInstanceContext getFragmentInstanceContext() { + return fragmentInstanceContext; + } + + public List<PartialPath> getPaths() { + return paths; + } + + public Filter getTimeFilter() { + return timeFilter; + } + + public VirtualStorageGroupProcessor getDataRegion() { + return dataRegion; + } + + public List<SourceOperator> getSourceOperators() { + return sourceOperators; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java index 015212f..f9333da 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java @@ -69,4 +69,8 @@ public class FragmentInstanceContext extends QueryContext { public List<OperatorContext> getOperatorContexts() { return operatorContexts; } + + public FragmentInstanceId getId() { + return id; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java index df89218..0821424 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java @@ -48,4 +48,9 @@ public interface Operator extends AutoCloseable { /** This method will always be called before releasing the Operator reference. */ @Override default void close() throws Exception {} + + /** + * Is this operator completely finished processing and no more output TsBlock will be produced. + */ + boolean isFinished() throws IOException; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java index a08fb68..0eda188 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java @@ -49,4 +49,9 @@ public class AggregateOperator implements ProcessOperator { public void close() throws Exception { ProcessOperator.super.close(); } + + @Override + public boolean isFinished() { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java index ebfb54e..b2439b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java @@ -48,4 +48,9 @@ public class DeviceMergeOperator implements ProcessOperator { public void close() throws Exception { ProcessOperator.super.close(); } + + @Override + public boolean isFinished() { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java index 52a3fba..cfbe1e0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java @@ -48,4 +48,9 @@ public class FillOperator implements ProcessOperator { public void close() throws Exception { ProcessOperator.super.close(); } + + @Override + public boolean isFinished() { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java index 0083ae0..a8c28e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java @@ -49,4 +49,9 @@ public class FilterNullOperator implements ProcessOperator { public void close() throws Exception { ProcessOperator.super.close(); } + + @Override + public boolean isFinished() { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java index 285427a..5f87d42 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java @@ -49,4 +49,9 @@ public class GroupByLevelOperator implements ProcessOperator { public void close() throws Exception { ProcessOperator.super.close(); } + + @Override + public boolean isFinished() { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java index ec2995f..ef54668 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java @@ -74,4 +74,9 @@ public class LimitOperator implements ProcessOperator { public void close() throws Exception { child.close(); } + + @Override + public boolean isFinished() throws IOException { + return remainingLimit == 0 || child.isFinished(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java index a22985c..7cc8102 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java @@ -49,4 +49,9 @@ public class OffsetOperator implements ProcessOperator { public void close() throws Exception { ProcessOperator.super.close(); } + + @Override + public boolean isFinished() { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java index 5e2f0ce..744e5b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java @@ -49,4 +49,9 @@ public class SortOperator implements ProcessOperator { public void close() throws Exception { ProcessOperator.super.close(); } + + @Override + public boolean isFinished() { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java index 1020e41..0d0feb6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java @@ -59,6 +59,8 @@ public class TimeJoinOperator implements ProcessOperator { */ private final List<TSDataType> dataTypes; + private boolean finished; + public TimeJoinOperator( OperatorContext operatorContext, List<Operator> children, @@ -154,6 +156,9 @@ public class TimeJoinOperator implements ProcessOperator { @Override public boolean hasNext() throws IOException { + if (finished) { + return false; + } for (int i = 0; i < inputCount; i++) { if (!empty(i)) { return true; @@ -175,6 +180,22 @@ public class TimeJoinOperator implements ProcessOperator { } } + @Override + public boolean isFinished() { + if (finished) { + return true; + } + finished = true; + for (int i = 0; i < columnCount; i++) { + // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i] + if (!noMoreTsBlocks[i] || !empty(i)) { + finished = false; + break; + } + } + return finished; + } + private boolean empty(int columnIndex) { return inputTsBlocks[columnIndex] == null || inputTsBlocks[columnIndex].getPositionCount() == inputIndex[columnIndex]; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java index 7bbccbf..9b67c6c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java @@ -51,6 +51,11 @@ public class FragmentSinkOperator implements SinkOperator { } @Override + public boolean isFinished() { + return false; + } + + @Override public void send(TsBlock tsBlock) {} @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java index 9fd48c7..58cb25e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java @@ -18,7 +18,6 @@ */ package org.apache.iotdb.db.mpp.operator.source; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.PartialPath; @@ -48,12 +47,10 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { Set<String> allSensors, TSDataType dataType, FragmentInstanceContext context, - QueryDataSource dataSource, Filter timeFilter, Filter valueFilter, boolean ascending) { - super( - seriesPath, allSensors, dataType, context, dataSource, timeFilter, valueFilter, ascending); + super(seriesPath, allSensors, dataType, context, timeFilter, valueFilter, ascending); dataTypes = ((AlignedPath) seriesPath) .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java index 87151ba..07aa5a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.operator.source; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.operator.OperatorContext; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -51,7 +52,15 @@ public class SeriesAggregateScanOperator implements SourceOperator { } @Override + public boolean isFinished() { + return false; + } + + @Override public PlanNodeId getSourceId() { return null; } + + @Override + public void initQueryDataSource(QueryDataSource dataSource) {} } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java index e84e649..acb40ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java @@ -20,8 +20,8 @@ package org.apache.iotdb.db.mpp.operator.source; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.metadata.path.PartialPath; -import org.apache.iotdb.db.mpp.operator.Operator; import org.apache.iotdb.db.mpp.operator.OperatorContext; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -29,19 +29,19 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.io.IOException; import java.util.Set; -public class SeriesScanOperator implements Operator { +public class SeriesScanOperator implements SourceOperator { private final OperatorContext operatorContext; private final SeriesScanUtil seriesScanUtil; private TsBlock tsBlock; private boolean hasCachedTsBlock = false; + private boolean finished = false; public SeriesScanOperator( PartialPath seriesPath, Set<String> allSensors, TSDataType dataType, OperatorContext context, - QueryDataSource dataSource, Filter timeFilter, Filter valueFilter, boolean ascending) { @@ -52,7 +52,6 @@ public class SeriesScanOperator implements Operator { allSensors, dataType, context.getInstanceContext(), - dataSource, timeFilter, valueFilter, ascending); @@ -107,6 +106,11 @@ public class SeriesScanOperator implements Operator { return hasCachedTsBlock; } + @Override + public boolean isFinished() throws IOException { + return finished || (finished = hasNext()); + } + private boolean readChunkData() throws IOException { while (seriesScanUtil.hasNextChunk()) { if (readPageData()) { @@ -129,4 +133,14 @@ public class SeriesScanOperator implements Operator { private boolean isEmpty(TsBlock tsBlock) { return tsBlock == null || tsBlock.isEmpty(); } + + @Override + public PlanNodeId getSourceId() { + return null; + } + + @Override + public void initQueryDataSource(QueryDataSource dataSource) { + seriesScanUtil.initQueryDataSource(dataSource); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java index 3a1041d..80e0a67 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader; import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; @@ -73,7 +74,7 @@ public class SeriesScanUtil { private final Filter timeFilter; private final Filter valueFilter; - private final QueryDataSource dataSource; + private QueryDataSource dataSource; /* * file index @@ -117,7 +118,6 @@ public class SeriesScanUtil { Set<String> allSensors, TSDataType dataType, FragmentInstanceContext context, - QueryDataSource dataSource, Filter timeFilter, Filter valueFilter, boolean ascending) { @@ -125,18 +125,15 @@ public class SeriesScanUtil { this.allSensors = allSensors; this.dataType = dataType; this.context = context; - this.dataSource = dataSource; this.timeFilter = timeFilter; this.valueFilter = valueFilter; if (ascending) { this.orderUtils = new AscTimeOrderUtils(); mergeReader = getPriorityMergeReader(); - this.curSeqFileIndex = 0; this.curUnseqFileIndex = 0; } else { this.orderUtils = new DescTimeOrderUtils(); mergeReader = getDescPriorityMergeReader(); - this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1; this.curUnseqFileIndex = 0; } @@ -154,6 +151,12 @@ public class SeriesScanUtil { versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics()))); } + public void initQueryDataSource(QueryDataSource dataSource) { + QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending()); + this.dataSource = dataSource; + orderUtils.setCurSeqFileIndex(dataSource); + } + protected PriorityMergeReader getPriorityMergeReader() { return new PriorityMergeReader(); } @@ -1162,6 +1165,8 @@ public class SeriesScanUtil { TsFileResource getNextSeqFileResource(boolean isDelete); TsFileResource getNextUnseqFileResource(boolean isDelete); + + void setCurSeqFileIndex(QueryDataSource dataSource); } class DescTimeOrderUtils implements TimeOrderUtils { @@ -1273,6 +1278,11 @@ public class SeriesScanUtil { } return tsFileResource; } + + @Override + public void setCurSeqFileIndex(QueryDataSource dataSource) { + curSeqFileIndex = dataSource.getSeqResourcesSize() - 1; + } } class AscTimeOrderUtils implements TimeOrderUtils { @@ -1384,5 +1394,10 @@ public class SeriesScanUtil { } return tsFileResource; } + + @Override + public void setCurSeqFileIndex(QueryDataSource dataSource) { + curSeqFileIndex = 0; + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java index 8454fd6..2e81002 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java @@ -18,10 +18,13 @@ */ package org.apache.iotdb.db.mpp.operator.source; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.operator.Operator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; public interface SourceOperator extends Operator { PlanNodeId getSourceId(); + + void initQueryDataSource(QueryDataSource dataSource); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java index 72f8544..b3c60d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java @@ -18,7 +18,6 @@ */ package org.apache.iotdb.db.mpp.sql.planner; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.mpp.common.filter.QueryFilter; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; @@ -66,14 +65,11 @@ public class LocalExecutionPlanner { OperatorContext operatorContext = context.taskContext.addOperatorContext( context.getNextOperatorId(), node.getId(), SeriesScanOperator.class.getSimpleName()); - // TODO should create QueryDataSource in SeriesScanOperator's runtime - QueryDataSource dataSource = null; return new SeriesScanOperator( seriesPath, node.getAllSensors(), seriesPath.getSeriesType(), operatorContext, - dataSource, node.getTimeFilter(), node.getValueFilter(), ascending); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java index 14cbeec..b9b439c 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java @@ -33,7 +33,6 @@ import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; -import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -95,18 +94,16 @@ public class LimitOperatorTest { 3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName()); fragmentInstanceContext.addOperatorContext( 4, new PlanNodeId("4"), LimitOperator.class.getSimpleName()); - QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources); - QueryUtils.fillOrderIndexes(dataSource, measurementPath1.getDevice(), true); SeriesScanOperator seriesScanOperator1 = new SeriesScanOperator( measurementPath1, allSensors, TSDataType.INT32, fragmentInstanceContext.getOperatorContexts().get(0), - dataSource, null, null, true); + seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); MeasurementPath measurementPath2 = new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32); @@ -116,10 +113,10 @@ public class LimitOperatorTest { allSensors, TSDataType.INT32, fragmentInstanceContext.getOperatorContexts().get(1), - dataSource, null, null, true); + seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); TimeJoinOperator timeJoinOperator = new TimeJoinOperator( diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java index c5c3d15..1fd496e 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; -import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -83,18 +82,16 @@ public class SeriesScanOperatorTest { new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance")); fragmentInstanceContext.addOperatorContext( 1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName()); - QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources); - QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true); SeriesScanOperator seriesScanOperator = new SeriesScanOperator( measurementPath, allSensors, TSDataType.INT32, fragmentInstanceContext.getOperatorContexts().get(0), - dataSource, null, null, true); + seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); int count = 0; while (seriesScanOperator.hasNext()) { TsBlock tsBlock = seriesScanOperator.next(); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java index d1722ca..5548b79 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; -import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -89,18 +88,16 @@ public class TimeJoinOperatorTest { 2, new PlanNodeId("2"), SeriesScanOperator.class.getSimpleName()); fragmentInstanceContext.addOperatorContext( 3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName()); - QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources); - QueryUtils.fillOrderIndexes(dataSource, measurementPath1.getDevice(), true); SeriesScanOperator seriesScanOperator1 = new SeriesScanOperator( measurementPath1, allSensors, TSDataType.INT32, fragmentInstanceContext.getOperatorContexts().get(0), - dataSource, null, null, true); + seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); MeasurementPath measurementPath2 = new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32); @@ -110,10 +107,10 @@ public class TimeJoinOperatorTest { allSensors, TSDataType.INT32, fragmentInstanceContext.getOperatorContexts().get(1), - dataSource, null, null, true); + seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); TimeJoinOperator timeJoinOperator = new TimeJoinOperator(
