This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 54ca8252a1a Pipe: Fixed the table deletion transfer logic & pipe meta
IT & interval manager in general model (#16268)
54ca8252a1a is described below
commit 54ca8252a1a9f1cbd7cee72f38acfb215d4ee81b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 26 18:31:13 2025 +0800
Pipe: Fixed the table deletion transfer logic & pipe meta IT & interval
manager in general model (#16268)
* ft
* remove ancient check
* refactor
* generic-fix
* Update IntervalManager.java
* fix
* ancient-bug
* push
---
.../manual/enhanced/IoTDBPipeMetaIT.java | 25 +++++++++++-----------
.../visitor/PipePlanToStatementVisitor.java | 6 +++++-
.../source/dataregion/IoTDBDataRegionSource.java | 16 ++------------
.../listener/PipeInsertionDataNodeListener.java | 5 ++++-
.../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 14 ++++++------
.../plan/relational/sql/ast/Delete.java | 3 ++-
.../task/progress/interval/PipeCommitInterval.java | 6 +++---
.../pipe/datastructure/interval/Interval.java | 6 +++---
.../datastructure/interval/IntervalManager.java | 13 +++++++----
9 files changed, 48 insertions(+), 46 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
index 355c36db2aa..b9cf79c34ef 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -99,12 +100,12 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
- "create table table1(a id, b attribute, c int32) with
(ttl=3000)",
+ "create table table1(a tag, b attribute, c int32) with
(ttl=3000)",
"alter table table1 add column d int64",
"alter table table1 drop column c",
"alter table table1 set properties ttl=default",
"insert into table1 (a, b, d) values(1, 1, 1)",
- "create table noTransferTable(a id, b attribute, c int32) with
(ttl=3000)"),
+ "create table noTransferTable(a tag, b attribute, c int32) with
(ttl=3000)"),
null)) {
return;
}
@@ -126,7 +127,7 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
dbName,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
- "insert into table1 (a, b) values(1, 2)",
+ "insert into table1 (a, b, d) values(1, 2, 1)",
null)) {
return;
}
@@ -148,7 +149,7 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
}
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv, "select * from table1", "a,b,d,",
Collections.emptySet(), dbName);
+ receiverEnv, "select * from table1", "time,a,b,d,",
Collections.emptySet(), dbName);
if (!TestUtils.tryExecuteNonQueryWithRetry(
dbName,
@@ -177,9 +178,9 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
new HashSet<>(
Arrays.asList(
"time,TIMESTAMP,TIME,",
- "a,STRING,ID,",
+ "a,STRING,TAG,",
"b,STRING,ATTRIBUTE,",
- "d,INT64,MEASUREMENT,")),
+ "d,INT64,FIELD,")),
dbName);
if (!TestUtils.tryExecuteNonQueryWithRetry(
@@ -300,7 +301,7 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
- "create table table1(a id, b attribute, c int32) with
(ttl=3000)",
+ "create table table1(a tag, b attribute, c int32) with
(ttl=3000)",
"alter table table1 add column d int64",
"alter table table1 drop column b",
"alter table table1 set properties ttl=default"),
@@ -334,9 +335,8 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
return;
}
- final String dbName = "test";
if (!TestUtils.tryExecuteNonQueriesWithRetry(
- dbName,
+ null,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
@@ -370,10 +370,11 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client.startPipeExtended(new
TStartPipeReq("testPipe").setIsTableModel(true)).getCode());
if (!TestUtils.tryExecuteNonQueryWithRetry(
- dbName,
+ null,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
"grant alter on any to user testUser with grant option",
@@ -392,7 +393,7 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
",,MAINTAIN,false,",
",*.*,ALTER,true,",
",test.*,DROP,false,")),
- dbName);
+ (String) null);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
index bd2852a14b3..fc073226043 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
@@ -315,7 +315,11 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Object, Void> {
@Override
public Delete visitDeleteData(final RelationalDeleteDataNode node, final
Void context) {
- final Delete statement = new Delete();
+ final Delete statement =
+ new Delete(
+ new Table(
+ QualifiedName.of(
+ node.getDatabaseName(),
node.getModEntries().get(0).getTableName())));
statement.setDatabaseName(node.getDatabaseName());
statement.setTableDeletionEntries(node.getModEntries());
return statement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 72dc0b51754..4dc5fef859f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.source.IoTDBSource;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -237,11 +236,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
// Validate tree pattern and table pattern
- final TreePattern treePattern =
-
TreePattern.parsePipePatternFromSourceParameters(validator.getParameters());
- final TablePattern tablePattern =
-
TablePattern.parsePipePatternFromSourceParameters(validator.getParameters());
- validatePattern(treePattern, tablePattern);
+
validatePattern(TreePattern.parsePipePatternFromSourceParameters(validator.getParameters()));
// Validate extractor.history.enable and extractor.realtime.enable
validator
@@ -302,7 +297,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
realtimeExtractor.validate(validator);
}
- private void validatePattern(final TreePattern treePattern, final
TablePattern tablePattern) {
+ private void validatePattern(final TreePattern treePattern) {
if (!treePattern.isLegal()) {
throw new IllegalArgumentException(String.format("Pattern \"%s\" is
illegal.", treePattern));
}
@@ -316,13 +311,6 @@ public class IoTDBDataRegionSource extends IoTDBSource {
"The path pattern %s is not valid for the source. Only prefix or
full path is allowed.",
treePattern));
}
-
- if (shouldExtractDeletion &&
tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) {
- throw new IllegalArgumentException(
- String.format(
- "The table model pattern %s can not be specified when deletion
capture is enabled.",
- tablePattern));
- }
}
private void checkInvalidParameters(final PipeParameterValidator validator) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index dff966e72e8..8c49dd0a3dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegio
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeDataRegionAssigner;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.util.Objects;
@@ -141,7 +142,9 @@ public class PipeInsertionDataNodeListener {
final String regionId, final AbstractDeleteDataNode node) {
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(regionId);
// only events from registered data region will be extracted
- if (assigner == null) {
+ if (assigner == null
+ || node instanceof RelationalDeleteDataNode
+ && ((RelationalDeleteDataNode) node).getModEntries().isEmpty()) {
return null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
index 973e71e97b5..11d70e0daa7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
@@ -114,19 +113,19 @@ public class PipeEnrichedDeleteDataNode extends
AbstractDeleteDataNode {
@Override
public PlanNode clone() {
- return new PipeEnrichedDeleteDataNode((DeleteDataNode)
deleteDataNode.clone());
+ return new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode)
deleteDataNode.clone());
}
@Override
public PlanNode createSubNode(final int subNodeId, final int startIndex,
final int endIndex) {
return new PipeEnrichedDeleteDataNode(
- (DeleteDataNode) deleteDataNode.createSubNode(subNodeId, startIndex,
endIndex));
+ (AbstractDeleteDataNode) deleteDataNode.createSubNode(subNodeId,
startIndex, endIndex));
}
@Override
public PlanNode cloneWithChildren(final List<PlanNode> children) {
return new PipeEnrichedDeleteDataNode(
- (DeleteDataNode) deleteDataNode.cloneWithChildren(children));
+ (AbstractDeleteDataNode) deleteDataNode.cloneWithChildren(children));
}
@Override
@@ -157,7 +156,8 @@ public class PipeEnrichedDeleteDataNode extends
AbstractDeleteDataNode {
}
public static PipeEnrichedDeleteDataNode deserialize(final ByteBuffer
buffer) {
- return new PipeEnrichedDeleteDataNode((DeleteDataNode)
PlanNodeType.deserialize(buffer));
+ return new PipeEnrichedDeleteDataNode(
+ (AbstractDeleteDataNode) PlanNodeType.deserialize(buffer));
}
@Override
@@ -183,7 +183,7 @@ public class PipeEnrichedDeleteDataNode extends
AbstractDeleteDataNode {
plan ->
plan instanceof PipeEnrichedDeleteDataNode
? plan
- : new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
+ : new PipeEnrichedDeleteDataNode((AbstractDeleteDataNode)
plan))
.collect(Collectors.toList());
}
@@ -206,6 +206,6 @@ public class PipeEnrichedDeleteDataNode extends
AbstractDeleteDataNode {
(SearchNode) ((PipeEnrichedDeleteDataNode)
searchNode).getDeleteDataNode())
.collect(Collectors.toList());
return new PipeEnrichedDeleteDataNode(
- (DeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
+ (AbstractDeleteDataNode)
deleteDataNode.merge(unrichedDeleteDataNodes));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
index 72b8a865589..788b5ddffcd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Delete.java
@@ -44,8 +44,9 @@ public class Delete extends Statement {
private String databaseName;
private Collection<TRegionReplicaSet> replicaSets;
- public Delete() {
+ public Delete(final Table table) {
super(null);
+ this.table = requireNonNull(table, "table is null");
}
public Delete(final NodeLocation location, final Table table) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
index 7ee8008c0e0..46a3d3e2a86 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -33,12 +33,12 @@ public class PipeCommitInterval extends
Interval<PipeCommitInterval> {
private final PipeTaskMeta pipeTaskMeta;
public PipeCommitInterval(
- final long s,
- final long e,
+ final long start,
+ final long end,
final ProgressIndex currentIndex,
final List<Runnable> onCommittedHooks,
final PipeTaskMeta pipeTaskMeta) {
- super(s, e);
+ super(start, end);
this.pipeTaskMeta = pipeTaskMeta;
this.currentIndex = currentIndex;
this.onCommittedHooks = onCommittedHooks;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
index 67ecaa66d6e..45b51e3ee29 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
@@ -23,9 +23,9 @@ public class Interval<T extends Interval<T>> implements
Comparable<Interval<?>>
public long start;
public long end;
- public Interval(final long s, final long e) {
- start = s;
- end = e;
+ public Interval(final long start, final long end) {
+ this.start = start;
+ this.end = end;
}
public void onMerged(final T another) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
index 179a379b7c3..cecdef312de 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
@@ -19,33 +19,38 @@
package org.apache.iotdb.commons.pipe.datastructure.interval;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.util.TreeSet;
+@NotThreadSafe
public class IntervalManager<T extends Interval<T>> {
private final TreeSet<T> intervals = new TreeSet<>();
// insert into new interval and merge
public void addInterval(final T newInterval) {
// Left closest
- final T left = intervals.floor(newInterval);
+ T left = intervals.floor(newInterval);
// Right closest
- final T right = intervals.ceiling(newInterval);
+ T right = intervals.ceiling(newInterval);
// Merge left ([0,1] + [2,3] → [0,3])
- if (left != null && left.end >= newInterval.start - 1) {
+ while (left != null && left.end >= newInterval.start - 1) {
newInterval.start = Math.min(left.start, newInterval.start);
newInterval.end = Math.max(left.end, newInterval.end);
newInterval.onMerged(left);
intervals.remove(left);
+ left = intervals.floor(newInterval);
}
// Merge right ([2,3] + [3,4] → [2,4])
- if (right != null && newInterval.end >= right.start - 1) {
+ while (right != null && newInterval.end >= right.start - 1) {
newInterval.start = Math.min(newInterval.start, right.start);
newInterval.end = Math.max(newInterval.end, right.end);
newInterval.onMerged(right);
intervals.remove(right);
+ right = intervals.ceiling(newInterval);
}
intervals.add(newInterval);