This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 554665ef95 [IOTDB-3353] Refactor insert exception processing (#6451)
554665ef95 is described below

commit 554665ef958130a1cf0b91ae2dbfc3eec5df63fc
Author: Mrquan <[email protected]>
AuthorDate: Tue Jun 28 11:00:51 2022 +0800

    [IOTDB-3353] Refactor insert exception processing (#6451)
---
 .../statemachine/visitor/DataExecutionVisitor.java | 49 +++++++++++++++++++---
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 14 ++++---
 .../iotdb/db/exception/BatchProcessException.java  |  5 +++
 3 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
index bbda1bc260..418bb2b5e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
@@ -33,13 +33,12 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.rpc.RpcUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.Map;
 
 public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataExecutionVisitor.class);
@@ -69,7 +68,13 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       LOGGER.error("Error in executing plan node: {}", node, e);
       return StatusUtils.EXECUTE_STATEMENT_ERROR;
     } catch (BatchProcessException e) {
-      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+      LOGGER.warn(
+          "Batch failure in executing a InsertTabletNode. device: {}, 
startTime: {}, measurements: {}, failing status: {}",
+          node.getDevicePath(),
+          node.getTimes()[0],
+          node.getMeasurements(),
+          e.getFailingStatus());
+      return StatusUtils.EXECUTE_STATEMENT_ERROR;
     }
   }
 
@@ -79,7 +84,18 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       dataRegion.insert(node);
       return StatusUtils.OK;
     } catch (BatchProcessException e) {
-      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+      LOGGER.warn("Batch failure in executing a InsertRowsNode.");
+      // for each error
+      for (Map.Entry<Integer, TSStatus> failedEntry : 
node.getResults().entrySet()) {
+        InsertRowNode insertRowNode = 
node.getInsertRowNodeList().get(failedEntry.getKey());
+        LOGGER.warn(
+            "Insert row failed. device: {}, time: {}, measurements: {}, 
failing status: {}",
+            insertRowNode.getDevicePath(),
+            insertRowNode.getTime(),
+            insertRowNode.getMeasurements(),
+            failedEntry.getValue());
+      }
+      return StatusUtils.EXECUTE_STATEMENT_ERROR;
     }
   }
 
@@ -89,7 +105,18 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       dataRegion.insertTablets(node);
       return StatusUtils.OK;
     } catch (BatchProcessException e) {
-      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+      LOGGER.warn("Batch failure in executing a InsertMultiTabletsNode.");
+      for (Map.Entry<Integer, TSStatus> failedEntry : 
node.getResults().entrySet()) {
+        InsertTabletNode insertTabletNode =
+            node.getInsertTabletNodeList().get(failedEntry.getKey());
+        LOGGER.warn(
+            "Insert tablet failed. device: {}, startTime: {}, measurements: 
{}, failing status: {}",
+            insertTabletNode.getDevicePath(),
+            insertTabletNode.getTimes()[0],
+            insertTabletNode.getMeasurements(),
+            failedEntry.getValue());
+      }
+      return StatusUtils.EXECUTE_STATEMENT_ERROR;
     }
   }
 
@@ -103,7 +130,17 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       LOGGER.error("Error in executing plan node: {}", node, e);
       return StatusUtils.EXECUTE_STATEMENT_ERROR;
     } catch (BatchProcessException e) {
-      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+      LOGGER.warn("Batch failure in executing a InsertRowsOfOneDeviceNode.");
+      for (Map.Entry<Integer, TSStatus> failedEntry : 
node.getResults().entrySet()) {
+        InsertRowNode insertRowNode = 
node.getInsertRowNodeList().get(failedEntry.getKey());
+        LOGGER.warn(
+            "Insert row failed. device: {}, time: {}, measurements: {}, 
failing status: {}",
+            insertRowNode.getDevicePath(),
+            insertRowNode.getTime(),
+            insertRowNode.getMeasurements(),
+            failedEntry.getValue());
+      }
+      return StatusUtils.EXECUTE_STATEMENT_ERROR;
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 4fb701ee0b..3514b38a8c 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1053,7 +1053,7 @@ public class DataRegion {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public void insertTablet(InsertTabletNode insertTabletNode)
-      throws BatchProcessException, TriggerExecutionException, 
WriteProcessException {
+      throws TriggerExecutionException, BatchProcessException, 
WriteProcessException {
 
     writeLock("insertTablet");
     try {
@@ -1081,7 +1081,9 @@ public class DataRegion {
       }
       // loc pointing at first legal position
       if (loc == insertTabletNode.getRowCount()) {
-        throw new BatchProcessException(results);
+        throw new OutOfTTLException(
+            insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 
1],
+            (System.currentTimeMillis() - dataTTL));
       }
 
       //      TODO(Trigger)// fire trigger before insertion
@@ -3493,7 +3495,7 @@ public class DataRegion {
       writeUnlock();
     }
     if (!insertRowsOfOneDeviceNode.getResults().isEmpty()) {
-      throw new 
BatchProcessException(insertRowsOfOneDeviceNode.getFailingStatus());
+      throw new BatchProcessException("Partial failed inserting rows of one 
device");
     }
   }
 
@@ -3513,7 +3515,7 @@ public class DataRegion {
     }
 
     if (!insertRowsNode.getResults().isEmpty()) {
-      throw new BatchProcessException(insertRowsNode.getFailingStatus());
+      throw new BatchProcessException("Partial failed inserting rows");
     }
   }
 
@@ -3528,7 +3530,7 @@ public class DataRegion {
       InsertTabletNode insertTabletNode = 
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
       try {
         insertTablet(insertTabletNode);
-      } catch (TriggerExecutionException | BatchProcessException | 
WriteProcessException e) {
+      } catch (TriggerExecutionException | WriteProcessException | 
BatchProcessException e) {
         insertMultiTabletsNode
             .getResults()
             .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
@@ -3536,7 +3538,7 @@ public class DataRegion {
     }
 
     if (!insertMultiTabletsNode.getResults().isEmpty()) {
-      throw new 
BatchProcessException(insertMultiTabletsNode.getFailingStatus());
+      throw new BatchProcessException("Partial failed inserting multi 
tablets");
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java 
b/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java
index ee617f5a23..a0442e53e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java
@@ -33,6 +33,11 @@ public class BatchProcessException extends 
QueryProcessException {
     this.failingStatus = failingStatus;
   }
 
+  public BatchProcessException(String message) {
+    super(message);
+    failingStatus = null;
+  }
+
   public TSStatus[] getFailingStatus() {
     return failingStatus;
   }

Reply via email to