This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f6204fc62e761a58f0963663c9e4cffb454dd0bc
Author: liuminghui233 <[email protected]>
AuthorDate: Fri Nov 25 17:12:26 2022 +0800

    refactor IntoOperator
---
 .../operator/process/AbstractIntoOperator.java     | 101 +++++++++++++++++----
 .../operator/process/DeviceViewIntoOperator.java   |  79 +++++++++++-----
 .../execution/operator/process/IntoOperator.java   |  44 ++++++---
 3 files changed, 171 insertions(+), 53 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index f331b2ed87..9ec9b0d5bc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -38,6 +38,11 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,19 +50,33 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractIntoOperator.class);
+
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
+  protected TsBlock cachedTsBlock;
+
   protected List<InsertTabletStatementGenerator> 
insertTabletStatementGenerators;
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   private DataNodeInternalClient client;
 
+  private ListenableFuture<?> isBlocked = NOT_BLOCKED;
+
+  private final ListeningExecutorService writeOperationExecutor =
+      MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  private ListenableFuture<TSStatus> writeOperationFuture;
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
@@ -87,10 +106,21 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
-  protected void insertMultiTabletsInternally(boolean needCheck) {
+  protected boolean insertMultiTabletsInternally(boolean needCheck) {
+    InsertMultiTabletsStatement insertMultiTabletsStatement =
+        constructInsertMultiTabletsStatement(needCheck);
+    if (insertMultiTabletsStatement == null) {
+      return false;
+    }
+
+    executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+    return true;
+  }
+
+  protected InsertMultiTabletsStatement 
constructInsertMultiTabletsStatement(boolean needCheck) {
     if (insertTabletStatementGenerators == null
         || (needCheck && 
!existFullStatement(insertTabletStatementGenerators))) {
-      return;
+      return null;
     }
 
     List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
@@ -100,28 +130,64 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
       }
     }
     if (insertTabletStatementList.isEmpty()) {
-      return;
+      return null;
     }
 
     InsertMultiTabletsStatement insertMultiTabletsStatement = new 
InsertMultiTabletsStatement();
     
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    return insertMultiTabletsStatement;
+  }
 
+  protected void executeInsertMultiTabletsStatement(
+      InsertMultiTabletsStatement insertMultiTabletsStatement) {
     if (client == null) {
       client = new DataNodeInternalClient(operatorContext.getSessionInfo());
     }
-    TSStatus executionStatus = 
client.insertTablets(insertMultiTabletsStatement);
-    if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && executionStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-      String message =
-          String.format(
-              "Error occurred while inserting tablets in SELECT INTO: %s",
-              executionStatus.getMessage());
-      throw new IntoProcessException(message);
-    }
 
-    for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
-      generator.reset();
+    isBlocked = SettableFuture.create();
+    writeOperationFuture =
+        writeOperationExecutor.submit(
+            () -> {
+              LOGGER.info("");
+              return client.insertTablets(insertMultiTabletsStatement);
+            });
+
+    writeOperationFuture.addListener(
+        () -> {
+          LOGGER.info("");
+          ((SettableFuture<Void>) isBlocked).set(null);
+        },
+        writeOperationExecutor);
+  }
+
+  protected boolean handleFuture() {
+    if (writeOperationFuture != null) {
+      if (writeOperationFuture.isDone()) {
+        try {
+          TSStatus executionStatus = writeOperationFuture.get();
+          if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              && executionStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+            String message =
+                String.format(
+                    "Error occurred while inserting tablets in SELECT INTO: 
%s",
+                    executionStatus.getMessage());
+            throw new IntoProcessException(message);
+          }
+
+          for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
+            generator.reset();
+          }
+
+          writeOperationFuture = null;
+          return true;
+        } catch (ExecutionException | InterruptedException e) {
+          throw new IntoProcessException(e.getMessage());
+        }
+      } else {
+        return false;
+      }
     }
+    return true;
   }
 
   private boolean existFullStatement(
@@ -164,12 +230,14 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
+    return successfulAsList(Arrays.asList(isBlocked, child.isBlocked()));
   }
 
   @Override
   public boolean hasNext() {
-    return existNonEmptyStatement(insertTabletStatementGenerators) || 
child.hasNext();
+    return cachedTsBlock != null
+        || existNonEmptyStatement(insertTabletStatementGenerators)
+        || child.hasNext();
   }
 
   @Override
@@ -177,6 +245,7 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
     if (client != null) {
       client.close();
     }
+    writeOperationExecutor.shutdown();
     child.close();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 68329d56e1..7ef65c9e16 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,12 +20,12 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -75,39 +75,70 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
   }
 
   @Override
-  public TsBlock next() throws IntoProcessException {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      String device = 
String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
-      if (!Objects.equals(device, currentDevice)) {
-        insertMultiTabletsInternally(false);
-        updateResultTsBlock();
-
-        insertTabletStatementGenerators = 
constructInsertTabletStatementGeneratorsByDevice(device);
-        currentDevice = device;
-      }
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (IntoOperator.InsertTabletStatementGenerator generator :
-            insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, 
readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
-      }
+  public TsBlock next() {
+    if (!handleFuture()) {
+      return null;
+    }
+
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
     }
+    cachedTsBlock = null;
 
     if (child.hasNext()) {
+      processTsBlock(child.next());
       return null;
     } else {
-      insertMultiTabletsInternally(false);
+      InsertMultiTabletsStatement insertMultiTabletsStatement =
+          constructInsertMultiTabletsStatement(false);
       updateResultTsBlock();
+      currentDevice = null;
+
+      if (insertMultiTabletsStatement != null) {
+        executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+        return null;
+      }
       return resultTsBlockBuilder.build();
     }
   }
 
+  private boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    String device = 
String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+    if (!Objects.equals(device, currentDevice)) {
+      InsertMultiTabletsStatement insertMultiTabletsStatement =
+          constructInsertMultiTabletsStatement(false);
+      updateResultTsBlock();
+
+      insertTabletStatementGenerators = 
constructInsertTabletStatementGeneratorsByDevice(device);
+      currentDevice = device;
+
+      if (insertMultiTabletsStatement != null) {
+        executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+        cachedTsBlock = inputTsBlock;
+        return false;
+      }
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (IntoOperator.InsertTabletStatementGenerator generator :
+          insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, 
generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void updateResultTsBlock() {
     if (currentDevice == null) {
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 4a34488779..9e5c46e9fa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -60,28 +60,46 @@ public class IntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, 
readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
-      }
+    if (!handleFuture()) {
+      return null;
     }
 
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
+    }
+    cachedTsBlock = null;
+
     if (child.hasNext()) {
+      processTsBlock(child.next());
       return null;
     } else {
-      insertMultiTabletsInternally(false);
+      if (insertMultiTabletsInternally(false)) {
+        return null;
+      }
       return constructResultTsBlock();
     }
   }
 
+  private boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, 
generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
+      }
+    }
+    return true;
+  }
+
   private TsBlock constructResultTsBlock() {
     List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.selectIntoColumnHeaders.stream()

Reply via email to