This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 54ca8252a1a Pipe: Fixed the table deletion transfer logic & pipe meta 
IT & interval manager in general model (#16268)
54ca8252a1a is described below

commit 54ca8252a1a9f1cbd7cee72f38acfb215d4ee81b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 26 18:31:13 2025 +0800

    Pipe: Fixed the table deletion transfer logic & pipe meta IT & interval 
manager in general model (#16268)
    
    * ft
    
    * remove ancient check
    
    * refactor
    
    * generic-fix
    
    * Update IntervalManager.java
    
    * fix
    
    * ancient-bug
    
    * push
---
 .../manual/enhanced/IoTDBPipeMetaIT.java           | 25 +++++++++++-----------
 .../visitor/PipePlanToStatementVisitor.java        |  6 +++++-
 .../source/dataregion/IoTDBDataRegionSource.java   | 16 ++------------
 .../listener/PipeInsertionDataNodeListener.java    |  5 ++++-
 .../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 14 ++++++------
 .../plan/relational/sql/ast/Delete.java            |  3 ++-
 .../task/progress/interval/PipeCommitInterval.java |  6 +++---
 .../pipe/datastructure/interval/Interval.java      |  6 +++---
 .../datastructure/interval/IntervalManager.java    | 13 +++++++----
 9 files changed, 48 insertions(+), 46 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
index 355c36db2aa..b9cf79c34ef 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -99,12 +100,12 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
           BaseEnv.TABLE_SQL_DIALECT,
           senderEnv,
           Arrays.asList(
-              "create table table1(a id, b attribute, c int32) with 
(ttl=3000)",
+              "create table table1(a tag, b attribute, c int32) with 
(ttl=3000)",
               "alter table table1 add column d int64",
               "alter table table1 drop column c",
               "alter table table1 set properties ttl=default",
               "insert into table1 (a, b, d) values(1, 1, 1)",
-              "create table noTransferTable(a id, b attribute, c int32) with 
(ttl=3000)"),
+              "create table noTransferTable(a tag, b attribute, c int32) with 
(ttl=3000)"),
           null)) {
         return;
       }
@@ -126,7 +127,7 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
           dbName,
           BaseEnv.TABLE_SQL_DIALECT,
           senderEnv,
-          "insert into table1 (a, b) values(1, 2)",
+          "insert into table1 (a, b, d) values(1, 2, 1)",
           null)) {
         return;
       }
@@ -148,7 +149,7 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
       }
 
       TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv, "select * from table1", "a,b,d,", 
Collections.emptySet(), dbName);
+          receiverEnv, "select * from table1", "time,a,b,d,", 
Collections.emptySet(), dbName);
 
       if (!TestUtils.tryExecuteNonQueryWithRetry(
           dbName,
@@ -177,9 +178,9 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
           new HashSet<>(
               Arrays.asList(
                   "time,TIMESTAMP,TIME,",
-                  "a,STRING,ID,",
+                  "a,STRING,TAG,",
                   "b,STRING,ATTRIBUTE,",
-                  "d,INT64,MEASUREMENT,")),
+                  "d,INT64,FIELD,")),
           dbName);
 
       if (!TestUtils.tryExecuteNonQueryWithRetry(
@@ -300,7 +301,7 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
           BaseEnv.TABLE_SQL_DIALECT,
           senderEnv,
           Arrays.asList(
-              "create table table1(a id, b attribute, c int32) with 
(ttl=3000)",
+              "create table table1(a tag, b attribute, c int32) with 
(ttl=3000)",
               "alter table table1 add column d int64",
               "alter table table1 drop column b",
               "alter table table1 set properties ttl=default"),
@@ -334,9 +335,8 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
         return;
       }
 
-      final String dbName = "test";
       if (!TestUtils.tryExecuteNonQueriesWithRetry(
-          dbName,
+          null,
           BaseEnv.TABLE_SQL_DIALECT,
           senderEnv,
           Arrays.asList(
@@ -370,10 +370,11 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
 
       Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client.startPipeExtended(new 
TStartPipeReq("testPipe").setIsTableModel(true)).getCode());
 
       if (!TestUtils.tryExecuteNonQueryWithRetry(
-          dbName,
+          null,
           BaseEnv.TABLE_SQL_DIALECT,
           senderEnv,
           "grant alter on any to user testUser with grant option",
@@ -392,7 +393,7 @@ public class IoTDBPipeMetaIT extends 
AbstractPipeTableModelDualManualIT {
                   ",,MAINTAIN,false,",
                   ",*.*,ALTER,true,",
                   ",test.*,DROP,false,")),
-          dbName);
+          (String) null);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
index bd2852a14b3..fc073226043 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
@@ -315,7 +315,11 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Object, Void> {
 
   @Override
   public Delete visitDeleteData(final RelationalDeleteDataNode node, final 
Void context) {
-    final Delete statement = new Delete();
+    final Delete statement =
+        new Delete(
+            new Table(
+                QualifiedName.of(
+                    node.getDatabaseName(), 
node.getModEntries().get(0).getTableName())));
     statement.setDatabaseName(node.getDatabaseName());
     statement.setTableDeletionEntries(node.getModEntries());
     return statement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 72dc0b51754..4dc5fef859f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.source.IoTDBSource;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -237,11 +236,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
             EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
 
     // Validate tree pattern and table pattern
-    final TreePattern treePattern =
-        
TreePattern.parsePipePatternFromSourceParameters(validator.getParameters());
-    final TablePattern tablePattern =
-        
TablePattern.parsePipePatternFromSourceParameters(validator.getParameters());
-    validatePattern(treePattern, tablePattern);
+    
validatePattern(TreePattern.parsePipePatternFromSourceParameters(validator.getParameters()));
 
     // Validate extractor.history.enable and extractor.realtime.enable
     validator
@@ -302,7 +297,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     realtimeExtractor.validate(validator);
   }
 
-  private void validatePattern(final TreePattern treePattern, final 
TablePattern tablePattern) {
+  private void validatePattern(final TreePattern treePattern) {
     if (!treePattern.isLegal()) {
       throw new IllegalArgumentException(String.format("Pattern \"%s\" is 
illegal.", treePattern));
     }
@@ -316,13 +311,6 @@ public class IoTDBDataRegionSource extends IoTDBSource {
               "The path pattern %s is not valid for the source. Only prefix or 
full path is allowed.",
               treePattern));
     }
-
-    if (shouldExtractDeletion && 
tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) {
-      throw new IllegalArgumentException(
-          String.format(
-              "The table model pattern %s can not be specified when deletion 
capture is enabled.",
-              tablePattern));
-    }
   }
 
   private void checkInvalidParameters(final PipeParameterValidator validator) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index dff966e72e8..8c49dd0a3dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegio
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeDataRegionAssigner;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import java.util.Objects;
@@ -141,7 +142,9 @@ public class PipeInsertionDataNodeListener {
       final String regionId, final AbstractDeleteDataNode node) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(regionId);
     // only events from registered data region will be extracted
-    if (assigner == null) {
+    if (assigner == null
+        || node instanceof RelationalDeleteDataNode
+            && ((RelationalDeleteDataNode) node).getModEntries().isEmpty()) {
       return null;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
index 973e71e97b5..11d70e0daa7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 
@@ -114,19 +113,19 @@ public class PipeEnrichedDeleteDataNode extends 
AbstractDeleteDataNode {
 
   @Override
   public PlanNode clone() {
-    return new PipeEnrichedDeleteDataNode((DeleteDataNode) 
deleteDataNode.clone());
+    return new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) 
deleteDataNode.clone());
   }
 
   @Override
   public PlanNode createSubNode(final int subNodeId, final int startIndex, 
final int endIndex) {
     return new PipeEnrichedDeleteDataNode(
-        (DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex, 
endIndex));
+        (AbstractDeleteDataNode) deleteDataNode.createSubNode(subNodeId, 
startIndex, endIndex));
   }
 
   @Override
   public PlanNode cloneWithChildren(final List<PlanNode> children) {
     return new PipeEnrichedDeleteDataNode(
-        (DeleteDataNode) deleteDataNode.cloneWithChildren(children));
+        (AbstractDeleteDataNode) deleteDataNode.cloneWithChildren(children));
   }
 
   @Override
@@ -157,7 +156,8 @@ public class PipeEnrichedDeleteDataNode extends 
AbstractDeleteDataNode {
   }
 
   public static PipeEnrichedDeleteDataNode deserialize(final ByteBuffer 
buffer) {
-    return new PipeEnrichedDeleteDataNode((DeleteDataNode) 
PlanNodeType.deserialize(buffer));
+    return new PipeEnrichedDeleteDataNode(
+        (AbstractDeleteDataNode) PlanNodeType.deserialize(buffer));
   }
 
   @Override
@@ -183,7 +183,7 @@ public class PipeEnrichedDeleteDataNode extends 
AbstractDeleteDataNode {
             plan ->
                 plan instanceof PipeEnrichedDeleteDataNode
                     ? plan
-                    : new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
+                    : new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode) 
plan))
         .collect(Collectors.toList());
   }
 
@@ -206,6 +206,6 @@ public class PipeEnrichedDeleteDataNode extends 
AbstractDeleteDataNode {
                     (SearchNode) ((PipeEnrichedDeleteDataNode) 
searchNode).getDeleteDataNode())
             .collect(Collectors.toList());
     return new PipeEnrichedDeleteDataNode(
-        (DeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
+        (AbstractDeleteDataNode) 
deleteDataNode.merge(unrichedDeleteDataNodes));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
index 72b8a865589..788b5ddffcd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
@@ -44,8 +44,9 @@ public class Delete extends Statement {
   private String databaseName;
   private Collection<TRegionReplicaSet> replicaSets;
 
-  public Delete() {
+  public Delete(final Table table) {
     super(null);
+    this.table = requireNonNull(table, "table is null");
   }
 
   public Delete(final NodeLocation location, final Table table) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
index 7ee8008c0e0..46a3d3e2a86 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -33,12 +33,12 @@ public class PipeCommitInterval extends 
Interval<PipeCommitInterval> {
   private final PipeTaskMeta pipeTaskMeta;
 
   public PipeCommitInterval(
-      final long s,
-      final long e,
+      final long start,
+      final long end,
       final ProgressIndex currentIndex,
       final List<Runnable> onCommittedHooks,
       final PipeTaskMeta pipeTaskMeta) {
-    super(s, e);
+    super(start, end);
     this.pipeTaskMeta = pipeTaskMeta;
     this.currentIndex = currentIndex;
     this.onCommittedHooks = onCommittedHooks;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
index 67ecaa66d6e..45b51e3ee29 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
@@ -23,9 +23,9 @@ public class Interval<T extends Interval<T>> implements 
Comparable<Interval<?>>
   public long start;
   public long end;
 
-  public Interval(final long s, final long e) {
-    start = s;
-    end = e;
+  public Interval(final long start, final long end) {
+    this.start = start;
+    this.end = end;
   }
 
   public void onMerged(final T another) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
index 179a379b7c3..cecdef312de 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
@@ -19,33 +19,38 @@
 
 package org.apache.iotdb.commons.pipe.datastructure.interval;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.util.TreeSet;
 
+@NotThreadSafe
 public class IntervalManager<T extends Interval<T>> {
   private final TreeSet<T> intervals = new TreeSet<>();
 
   // insert into new interval and merge
   public void addInterval(final T newInterval) {
     // Left closest
-    final T left = intervals.floor(newInterval);
+    T left = intervals.floor(newInterval);
 
     // Right closest
-    final T right = intervals.ceiling(newInterval);
+    T right = intervals.ceiling(newInterval);
 
     // Merge left ([0,1] + [2,3] → [0,3])
-    if (left != null && left.end >= newInterval.start - 1) {
+    while (left != null && left.end >= newInterval.start - 1) {
       newInterval.start = Math.min(left.start, newInterval.start);
       newInterval.end = Math.max(left.end, newInterval.end);
       newInterval.onMerged(left);
       intervals.remove(left);
+      left = intervals.floor(newInterval);
     }
 
     // Merge right ([2,3] + [3,4] → [2,4])
-    if (right != null && newInterval.end >= right.start - 1) {
+    while (right != null && newInterval.end >= right.start - 1) {
       newInterval.start = Math.min(newInterval.start, right.start);
       newInterval.end = Math.max(newInterval.end, right.end);
       newInterval.onMerged(right);
       intervals.remove(right);
+      right = intervals.ceiling(newInterval);
     }
 
     intervals.add(newInterval);

Reply via email to