This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/FixIntoOperator1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f6204fc62e761a58f0963663c9e4cffb454dd0bc Author: liuminghui233 <[email protected]> AuthorDate: Fri Nov 25 17:12:26 2022 +0800 refactor IntoOperator --- .../operator/process/AbstractIntoOperator.java | 101 +++++++++++++++++---- .../operator/process/DeviceViewIntoOperator.java | 79 +++++++++++----- .../execution/operator/process/IntoOperator.java | 44 ++++++--- 3 files changed, 171 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index f331b2ed87..9ec9b0d5bc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -38,6 +38,11 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -45,19 +50,33 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import static com.google.common.util.concurrent.Futures.successfulAsList; + public abstract class AbstractIntoOperator implements ProcessOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class); + protected final OperatorContext operatorContext; protected final Operator child; + protected TsBlock cachedTsBlock; + protected List<InsertTabletStatementGenerator> insertTabletStatementGenerators; protected final Map<String, InputLocation> sourceColumnToInputLocationMap; private DataNodeInternalClient client; + private ListenableFuture<?> isBlocked = NOT_BLOCKED; + + private final ListeningExecutorService writeOperationExecutor = + MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + private ListenableFuture<TSStatus> writeOperationFuture; + public AbstractIntoOperator( OperatorContext operatorContext, Operator child, @@ -87,10 +106,21 @@ public abstract class AbstractIntoOperator implements ProcessOperator { return insertTabletStatementGenerators; } - protected void insertMultiTabletsInternally(boolean needCheck) { + protected boolean insertMultiTabletsInternally(boolean needCheck) { + InsertMultiTabletsStatement insertMultiTabletsStatement = + constructInsertMultiTabletsStatement(needCheck); + if (insertMultiTabletsStatement == null) { + return false; + } + + executeInsertMultiTabletsStatement(insertMultiTabletsStatement); + return true; + } + + protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) { if (insertTabletStatementGenerators == null || (needCheck && !existFullStatement(insertTabletStatementGenerators))) { - return; + return null; } List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); @@ -100,28 +130,64 @@ public abstract class AbstractIntoOperator implements ProcessOperator { } } if (insertTabletStatementList.isEmpty()) { - return; + return null; } InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement(); insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); + return insertMultiTabletsStatement; + } + protected void executeInsertMultiTabletsStatement( + InsertMultiTabletsStatement insertMultiTabletsStatement) { if (client == null) { client = new DataNodeInternalClient(operatorContext.getSessionInfo()); } - TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement); - if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - String message = - String.format( - "Error occurred while inserting tablets in SELECT INTO: %s", - executionStatus.getMessage()); - throw new IntoProcessException(message); - } - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - generator.reset(); + isBlocked = SettableFuture.create(); + writeOperationFuture = + writeOperationExecutor.submit( + () -> { + LOGGER.info(""); + return client.insertTablets(insertMultiTabletsStatement); + }); + + writeOperationFuture.addListener( + () -> { + LOGGER.info(""); + ((SettableFuture<Void>) isBlocked).set(null); + }, + writeOperationExecutor); + } + + protected boolean handleFuture() { + if (writeOperationFuture != null) { + if (writeOperationFuture.isDone()) { + try { + TSStatus executionStatus = writeOperationFuture.get(); + if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + String message = + String.format( + "Error occurred while inserting tablets in SELECT INTO: %s", + executionStatus.getMessage()); + throw new IntoProcessException(message); + } + + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + generator.reset(); + } + + writeOperationFuture = null; + return true; + } catch (ExecutionException | InterruptedException e) { + throw new IntoProcessException(e.getMessage()); + } + } else { + return false; + } } + return true; } private boolean existFullStatement( @@ -164,12 +230,14 @@ public abstract class AbstractIntoOperator implements ProcessOperator { @Override public ListenableFuture<?> isBlocked() { - return child.isBlocked(); + return successfulAsList(Arrays.asList(isBlocked, child.isBlocked())); } @Override public boolean hasNext() { - return existNonEmptyStatement(insertTabletStatementGenerators) || child.hasNext(); + return cachedTsBlock != null + || existNonEmptyStatement(insertTabletStatementGenerators) + || child.hasNext(); } @Override @@ -177,6 +245,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { if (client != null) { client.close(); } + writeOperationExecutor.shutdown(); child.close(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index 68329d56e1..7ef65c9e16 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -20,12 +20,12 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.exception.IntoProcessException; import org.apache.iotdb.db.mpp.common.header.ColumnHeader; import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; @@ -75,39 +75,70 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { } @Override - public TsBlock next() throws IntoProcessException { - TsBlock inputTsBlock = child.next(); - if (inputTsBlock != null) { - String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0)); - if (!Objects.equals(device, currentDevice)) { - insertMultiTabletsInternally(false); - updateResultTsBlock(); - - insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); - currentDevice = device; - } - int readIndex = 0; - while (readIndex < inputTsBlock.getPositionCount()) { - int lastReadIndex = readIndex; - for (IntoOperator.InsertTabletStatementGenerator generator : - insertTabletStatementGenerators) { - lastReadIndex = - Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex)); - } - readIndex = lastReadIndex; - insertMultiTabletsInternally(true); - } + public TsBlock next() { + if (!handleFuture()) { + return null; + } + + if (!processTsBlock(cachedTsBlock)) { + return null; } + cachedTsBlock = null; if (child.hasNext()) { + processTsBlock(child.next()); return null; } else { - insertMultiTabletsInternally(false); + InsertMultiTabletsStatement insertMultiTabletsStatement = + constructInsertMultiTabletsStatement(false); updateResultTsBlock(); + currentDevice = null; + + if (insertMultiTabletsStatement != null) { + executeInsertMultiTabletsStatement(insertMultiTabletsStatement); + return null; + } return resultTsBlockBuilder.build(); } } + private boolean processTsBlock(TsBlock inputTsBlock) { + if (inputTsBlock == null || inputTsBlock.isEmpty()) { + return true; + } + + String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0)); + if (!Objects.equals(device, currentDevice)) { + InsertMultiTabletsStatement insertMultiTabletsStatement = + constructInsertMultiTabletsStatement(false); + updateResultTsBlock(); + + insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + currentDevice = device; + + if (insertMultiTabletsStatement != null) { + executeInsertMultiTabletsStatement(insertMultiTabletsStatement); + cachedTsBlock = inputTsBlock; + return false; + } + } + + int readIndex = 0; + while (readIndex < inputTsBlock.getPositionCount()) { + int lastReadIndex = readIndex; + for (IntoOperator.InsertTabletStatementGenerator generator : + insertTabletStatementGenerators) { + lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex)); + } + readIndex = lastReadIndex; + if (insertMultiTabletsInternally(true)) { + cachedTsBlock = inputTsBlock.subTsBlock(readIndex); + return false; + } + } + return true; + } + private void updateResultTsBlock() { if (currentDevice == null) { return; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 4a34488779..9e5c46e9fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -60,28 +60,46 @@ public class IntoOperator extends AbstractIntoOperator { @Override public TsBlock next() { - TsBlock inputTsBlock = child.next(); - if (inputTsBlock != null) { - int readIndex = 0; - while (readIndex < inputTsBlock.getPositionCount()) { - int lastReadIndex = readIndex; - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - lastReadIndex = - Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex)); - } - readIndex = lastReadIndex; - insertMultiTabletsInternally(true); - } + if (!handleFuture()) { + return null; } + if (!processTsBlock(cachedTsBlock)) { + return null; + } + cachedTsBlock = null; + if (child.hasNext()) { + processTsBlock(child.next()); return null; } else { - insertMultiTabletsInternally(false); + if (insertMultiTabletsInternally(false)) { + return null; + } return constructResultTsBlock(); } } + private boolean processTsBlock(TsBlock inputTsBlock) { + if (inputTsBlock == null || inputTsBlock.isEmpty()) { + return true; + } + + int readIndex = 0; + while (readIndex < inputTsBlock.getPositionCount()) { + int lastReadIndex = readIndex; + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex)); + } + readIndex = lastReadIndex; + if (insertMultiTabletsInternally(true)) { + cachedTsBlock = inputTsBlock.subTsBlock(readIndex); + return false; + } + } + return true; + } + private TsBlock constructResultTsBlock() { List<TSDataType> outputDataTypes = ColumnHeaderConstant.selectIntoColumnHeaders.stream()
