This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch convert-on-type-mismatch
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/convert-on-type-mismatch by 
this push:
     new 7f06e3b74fa Update PipeStatementDataTypeConvertExecutionVisitor.java
7f06e3b74fa is described below

commit 7f06e3b74fa60e4979612e93a2191e2c21b3cdfe
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Aug 7 18:37:03 2024 +0800

    Update PipeStatementDataTypeConvertExecutionVisitor.java
---
 ...peStatementDataTypeConvertExecutionVisitor.java | 106 ++++++++++++++++-----
 1 file changed, 83 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index fc6e8339b3d..7db2812efc1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
+import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -27,12 +29,14 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Optional;
-import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * This visitor transforms the data type of the statement when the statement 
is executed and an
@@ -41,12 +45,29 @@ import java.util.function.Consumer;
 public class PipeStatementDataTypeConvertExecutionVisitor
     extends StatementVisitor<Optional<TSStatus>, Exception> {
 
-  private final Consumer<Statement> statementExecutor;
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(PipeStatementDataTypeConvertExecutionVisitor.class);
+
+  @FunctionalInterface
+  public interface StatementExecutor {
+    TSStatus execute(final Statement statement);
+  }
+
+  private final StatementExecutor statementExecutor;
 
-  public PipeStatementDataTypeConvertExecutionVisitor(final 
Consumer<Statement> statementExecutor) {
+  public PipeStatementDataTypeConvertExecutionVisitor(final StatementExecutor 
statementExecutor) {
     this.statementExecutor = statementExecutor;
   }
 
+  private Optional<TSStatus> tryExecute(final Statement statement) {
+    try {
+      return Optional.of(statementExecutor.execute(statement));
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to execute statement after data type conversion.", 
e);
+      return Optional.empty();
+    }
+  }
+
   @Override
   public Optional<TSStatus> visitNode(
       final StatementNode statementNode, final Exception exception) {
@@ -56,42 +77,81 @@ public class PipeStatementDataTypeConvertExecutionVisitor
   @Override
   public Optional<TSStatus> visitLoadFile(
       final LoadTsFileStatement loadTsFileStatement, final Exception 
exception) {
+    // TODO: judge if the exception is caused by data type mismatch
+    // TODO: convert the data type of the statement
     return visitStatement(loadTsFileStatement, exception);
   }
 
   @Override
-  public Optional<TSStatus> visitInsert(
-      InsertStatement insertStatement, final Exception exception) {
-    return visitStatement(insertStatement, exception);
+  public Optional<TSStatus> visitInsertRow(
+      final InsertRowStatement insertRowStatement, final Exception exception) {
+    // TODO: judge if the exception is caused by data type mismatch
+
+    return tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement));
   }
 
   @Override
-  public Optional<TSStatus> visitInsertTablet(
-      InsertTabletStatement insertTabletStatement, final Exception exception) {
-    return visitStatement(insertTabletStatement, exception);
+  public Optional<TSStatus> visitInsertRows(
+      final InsertRowsStatement insertRowsStatement, final Exception 
exception) {
+    // TODO: judge if the exception is caused by data type mismatch
+
+    if (insertRowsStatement.getInsertRowStatementList() == null
+        || insertRowsStatement.getInsertRowStatementList().isEmpty()) {
+      return Optional.empty();
+    }
+
+    final InsertRowsStatement convertedInsertRowsStatement = new 
InsertRowsStatement();
+    convertedInsertRowsStatement.setInsertRowStatementList(
+        insertRowsStatement.getInsertRowStatementList().stream()
+            .map(PipeConvertedInsertRowStatement::new)
+            .collect(Collectors.toList()));
+    return tryExecute(convertedInsertRowsStatement);
   }
 
   @Override
-  public Optional<TSStatus> visitInsertRow(
-      InsertRowStatement insertRowStatement, final Exception exception) {
-    return visitStatement(insertRowStatement, exception);
+  public Optional<TSStatus> visitInsertRowsOfOneDevice(
+      final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement,
+      final Exception exception) {
+    // TODO: judge if the exception is caused by data type mismatch
+
+    if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null
+        || 
insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) {
+      return Optional.empty();
+    }
+
+    final InsertRowsOfOneDeviceStatement 
convertedInsertRowsOfOneDeviceStatement =
+        new InsertRowsOfOneDeviceStatement();
+    convertedInsertRowsOfOneDeviceStatement.setInsertRowStatementList(
+        insertRowsOfOneDeviceStatement.getInsertRowStatementList().stream()
+            .map(PipeConvertedInsertRowStatement::new)
+            .collect(Collectors.toList()));
+    return tryExecute(convertedInsertRowsOfOneDeviceStatement);
   }
 
   @Override
-  public Optional<TSStatus> visitInsertRows(
-      InsertRowsStatement insertRowsStatement, final Exception exception) {
-    return visitStatement(insertRowsStatement, exception);
+  public Optional<TSStatus> visitInsertTablet(
+      final InsertTabletStatement insertTabletStatement, final Exception 
exception) {
+    // TODO: judge if the exception is caused by data type mismatch
+
+    return tryExecute(new 
PipeConvertedInsertTabletStatement(insertTabletStatement));
   }
 
   @Override
   public Optional<TSStatus> visitInsertMultiTablets(
-      InsertMultiTabletsStatement insertMultiTabletsStatement, final Exception 
exception) {
-    return visitStatement(insertMultiTabletsStatement, exception);
-  }
+      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
Exception exception) {
+    // TODO: judge if the exception is caused by data type mismatch
 
-  @Override
-  public Optional<TSStatus> visitInsertRowsOfOneDevice(
-      InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, final 
Exception exception) {
-    return visitStatement(insertRowsOfOneDeviceStatement, exception);
+    if (insertMultiTabletsStatement.getInsertTabletStatementList() == null
+        || 
insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) {
+      return Optional.empty();
+    }
+
+    final InsertMultiTabletsStatement convertedInsertMultiTabletsStatement =
+        new InsertMultiTabletsStatement();
+    convertedInsertMultiTabletsStatement.setInsertTabletStatementList(
+        insertMultiTabletsStatement.getInsertTabletStatementList().stream()
+            .map(PipeConvertedInsertTabletStatement::new)
+            .collect(Collectors.toList()));
+    return tryExecute(convertedInsertMultiTabletsStatement);
   }
 }

Reply via email to