This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-consensus-index
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-consensus-index by this
push:
new b17b6e4ca5d [IOTDB-5723] Pipe: Consensus Index (#9446)
b17b6e4ca5d is described below
commit b17b6e4ca5dcc240e58109ddb90cb32314556839
Author: yschengzi <[email protected]>
AuthorDate: Thu May 25 22:27:06 2023 +0800
[IOTDB-5723] Pipe: Consensus Index (#9446)
* finish consensus index
* add test
* spotless
* fix serialize
* finish except report
* change dataRegionId from String to TConsensusGroupId
* fix UT for ConsensusIndex
* create reference count for EnrichedEvent
* finish report progress index
* fix UT
* merge heartbeat
* finish datanode id
* add trace
* fix leader change null and updataToMaximum
* delete printTrace
* revert serialize and deserialize in InsertNode
* revert serialize and deserialize in InsertRowsNode
* well define ConsensusIndex
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/runtime/PipeRuntimeCoordinator.java | 5 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 7 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 10 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 4 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 9 +-
.../iotdb/confignode/persistence/PipeInfoTest.java | 3 +-
.../runtime/PipeHandleMetaChangeProcedureTest.java | 7 +-
.../index/ComparableConsensusRequest.java | 26 ++++
.../commons/consensus/index/ConsensusIndex.java | 69 ++++++++
.../consensus/index/ConsensusIndexType.java | 79 ++++++++++
.../consensus/index/impl/IoTConsensusIndex.java | 102 ++++++++++++
.../index/impl/MinimumConsensusIndex.java | 78 ++++++++++
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 59 ++++---
.../commons/pipe/task/meta/PipeMetaDeSerTest.java | 7 +-
.../db/engine/storagegroup/TsFileProcessor.java | 2 +
.../db/engine/storagegroup/TsFileResource.java | 33 ++++
.../plan/node/write/InsertMultiTabletsNode.java | 7 +
.../plan/planner/plan/node/write/InsertNode.java | 20 ++-
.../planner/plan/node/write/InsertRowsNode.java | 7 +
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 7 +
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 40 +++--
.../core/collector/IoTDBDataRegionCollector.java | 9 +-
.../PipeHistoricalDataRegionTsFileCollector.java | 24 ++-
.../realtime/PipeRealtimeDataRegionCollector.java | 10 ++
.../PipeRealtimeDataRegionHybridCollector.java | 4 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 6 +-
.../iotdb/db/pipe/core/event/EnrichedEvent.java | 60 ++++++-
.../core/event/impl/PipeTabletInsertionEvent.java | 17 +-
.../core/event/impl/PipeTsFileInsertionEvent.java | 27 +++-
.../event/realtime/PipeRealtimeCollectEvent.java | 27 ++--
.../org/apache/iotdb/db/pipe/task/PipeBuilder.java | 25 +--
.../org/apache/iotdb/db/pipe/task/PipeTask.java | 7 +-
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 15 +-
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 11 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 3 +-
.../TsFileResourceConsensusIndexTest.java | 173 +++++++++++++++++++++
.../collector/CachedSchemaPatternMatcherTest.java | 14 +-
.../core/collector/PipeRealtimeCollectTest.java | 12 +-
38 files changed, 893 insertions(+), 132 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 3455a6c1369..2992f6dd31e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -71,7 +71,10 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
.forEach(
(regionId, pair) -> {
if (regionId.getType().equals(TConsensusGroupType.DataRegion)) {
- dataRegionGroupToOldAndNewLeaderPairMap.put(regionId, pair);
+ dataRegionGroupToOldAndNewLeaderPairMap.put(
+ regionId,
+ new Pair<>( // null or -1 means empty origin leader
+ pair.left == null ? -1 : pair.left, pair.right == null
? -1 : pair.right));
}
});
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 76ec9007ffa..5e71d36319f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.persistence.pipe;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
@@ -210,7 +211,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (newDataRegionLeader != -1) {
consensusGroupIdToTaskMetaMap
.get(dataRegionGroupId)
- .setRegionLeader(newDataRegionLeader);
+ .setDataNodeId(newDataRegionLeader);
} else {
consensusGroupIdToTaskMetaMap.remove(dataRegionGroupId);
}
@@ -221,7 +222,9 @@ public class PipeTaskInfo implements SnapshotProcessor {
consensusGroupIdToTaskMetaMap.put(
// TODO: the progress index should be passed
from the leader
// correctly
- dataRegionGroupId, new PipeTaskMeta(0,
newDataRegionLeader));
+ dataRegionGroupId,
+ new PipeTaskMeta(
+ new MinimumConsensusIndex(),
newDataRegionLeader));
} else {
LOGGER.warn(
"The pipe task meta does not contain the
data region group {} or the data region group has already been removed",
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index db04d909f7d..91e12cda000 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -117,7 +117,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
for (final Map.Entry<TConsensusGroupId, PipeTaskMeta>
runtimeMetaOnConfigNode :
pipeTaskMetaMapOnConfigNode.entrySet()) {
- if (runtimeMetaOnConfigNode.getValue().getRegionLeader() !=
dataNodeId) {
+ if (runtimeMetaOnConfigNode.getValue().getDataNodeId() != dataNodeId) {
continue;
}
@@ -132,11 +132,13 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
// update progress index
- if (runtimeMetaOnConfigNode.getValue().getProgressIndex()
- < runtimeMetaFromDataNode.getProgressIndex()) {
+ if (!runtimeMetaOnConfigNode
+ .getValue()
+ .getProgressIndex()
+ .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
runtimeMetaOnConfigNode
.getValue()
- .setProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+ .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
needWriteConsensusOnConfigNodes = true;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index e0de835fb07..a21ec8c0f75 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -104,7 +105,8 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.forEach(
(region, leader) ->
// TODO: make index configurable
- consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0,
leader)));
+ consensusGroupIdToTaskMetaMap.put(
+ region, new PipeTaskMeta(new MinimumConsensusIndex(),
leader)));
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 9a83e00acd7..302fa209195 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
import org.apache.iotdb.common.rpc.thrift.ThrottleType;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
@@ -1050,7 +1051,7 @@ public class ConfigPhysicalPlanSerDeTest {
collectorAttributes.put("collector",
"org.apache.iotdb.pipe.collector.DefaultCollector");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
- PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+ PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumConsensusIndex(),
1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
@@ -1163,10 +1164,12 @@ public class ConfigPhysicalPlanSerDeTest {
{
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
- new PipeTaskMeta(789, 987));
+ new PipeTaskMeta(
+ new MinimumConsensusIndex(), 987)); // TODO: replace
with IoTConsensus
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
- new PipeTaskMeta(456, 789));
+ new PipeTaskMeta(
+ new MinimumConsensusIndex(), 789)); // TODO: replace
with IoTConsensus
}
});
pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 0246759c98c..eb53c6cc5a3 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -72,7 +73,7 @@ public class PipeInfoTest {
collectorAttributes.put("collector",
"org.apache.iotdb.pipe.collector.DefaultCollector");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
- PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+ PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumConsensusIndex(),
1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
index 07d74e2d9ff..5674a9a57cf 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -68,10 +69,12 @@ public class PipeHandleMetaChangeProcedureTest {
{
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
- new PipeTaskMeta(789, 987));
+ new PipeTaskMeta(
+ new MinimumConsensusIndex(), 987)); // TODO: replace
with IoTConsensus
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
- new PipeTaskMeta(456, 789));
+ new PipeTaskMeta(
+ new MinimumConsensusIndex(), 789)); // TODO: replace
with IoTConsensus
}
});
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
new file mode 100644
index 00000000000..dd0e5dfe319
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+public interface ComparableConsensusRequest {
+ ConsensusIndex getConsensusIndex();
+
+ void setConsensusIndex(ConsensusIndex consensusIndex);
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndex.java
new file mode 100644
index 00000000000..e3fcc0afefa
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndex.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface ConsensusIndex {
+
+ /** serialize this consensus index to the given byte buffer */
+ void serialize(ByteBuffer byteBuffer);
+
+ /** serialize this consensus index to the given output stream */
+ void serialize(OutputStream stream) throws IOException;
+
+ /**
+ * A.isAfter(B) is true if and only if A is strictly greater than B
+ *
+ * @param consensusIndex the consensus index to be compared
+ * @return true if and only if this consensus index is strictly greater than
the given consensus
+ * index
+ */
+ boolean isAfter(ConsensusIndex consensusIndex);
+
+ /**
+ * A.equals(B) is true if and only if A is equal to B
+ *
+ * @param consensusIndex the consensus index to be compared
+ * @return true if and only if this consensus index is equal to the given
consensus index
+ */
+ boolean equals(ConsensusIndex consensusIndex);
+
+ /**
+ * C = A.updateToMaximum(B) where C should satisfy:
+ *
+ * <p>(C.equals(A) || C.isAfter(A)) is true
+ *
+ * <p>(C.equals(B) || C.isAfter(B)) is true
+ *
+ * <p>There is no D, such that D satisfies the above conditions and
C.isAfter(D) is true
+ *
+ * <p>The implementation of this function should be reflexive, that is
+ * A.updateToMaximum(B).equals(B.updateToMaximum(A)) is true
+ *
+ * <p>Note: this function may modify the caller.
+ *
+ * @param consensusIndex the consensus index to be compared
+ * @return the maximum of the two consensus indexes
+ */
+ ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex);
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndexType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndexType.java
new file mode 100644
index 00000000000..28a81a243db
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndexType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import org.apache.iotdb.commons.consensus.index.impl.IoTConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public enum ConsensusIndexType {
+ MINIMUM_CONSENSUS_INDEX((short) 1),
+ IOT_CONSENSUS_INDEX((short) 2),
+ ;
+
+ private final short type;
+
+ ConsensusIndexType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(type, byteBuffer);
+ }
+
+ public void serialize(OutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(type, stream);
+ }
+
+ public static ConsensusIndex deserializeFrom(ByteBuffer byteBuffer) {
+ short indexType = byteBuffer.getShort();
+ switch (indexType) {
+ case 1:
+ return MinimumConsensusIndex.deserializeFrom(byteBuffer);
+ case 2:
+ return IoTConsensusIndex.deserializeFrom(byteBuffer);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported Consensus Index type %s.", indexType));
+ }
+ }
+
+ public static ConsensusIndex deserializeFrom(InputStream stream) throws
IOException {
+ short indexType = ReadWriteIOUtils.readShort(stream);
+ switch (indexType) {
+ case 1:
+ return MinimumConsensusIndex.deserializeFrom(stream);
+ case 2:
+ return IoTConsensusIndex.deserializeFrom(stream);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported Consensus Index type %s.", indexType));
+ }
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTConsensusIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTConsensusIndex.java
new file mode 100644
index 00000000000..3bd7f11665b
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTConsensusIndex.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTConsensusIndex implements ConsensusIndex {
+ private final Map<Integer, Long> peerId2SearchIndex;
+
+ public IoTConsensusIndex() {
+ peerId2SearchIndex = new HashMap<>();
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ConsensusIndexType.IOT_CONSENSUS_INDEX.serialize(byteBuffer);
+ // TODO: impl it
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ ConsensusIndexType.IOT_CONSENSUS_INDEX.serialize(stream);
+ // TODO: impl it
+ }
+
+ @Override
+ public boolean isAfter(ConsensusIndex consensusIndex) {
+ if (!(consensusIndex instanceof IoTConsensusIndex)) {
+ return false;
+ }
+
+ return ((IoTConsensusIndex) consensusIndex)
+ .peerId2SearchIndex.entrySet().stream()
+ .noneMatch(
+ entry ->
+ !this.peerId2SearchIndex.containsKey(entry.getKey())
+ || this.peerId2SearchIndex.get(entry.getKey()) <=
entry.getValue());
+ }
+
+ @Override
+ public boolean equals(ConsensusIndex consensusIndex) {
+ if (!(consensusIndex instanceof IoTConsensusIndex)) {
+ return false;
+ }
+
+ return ((IoTConsensusIndex) consensusIndex)
+ .peerId2SearchIndex.entrySet().stream()
+ .allMatch(
+ entry ->
+ this.peerId2SearchIndex.containsKey(entry.getKey())
+ &&
this.peerId2SearchIndex.get(entry.getKey()).equals(entry.getValue()));
+ }
+
+ @Override
+ public ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex) {
+ if (!(consensusIndex instanceof IoTConsensusIndex)) {
+ return this;
+ }
+
+ ((IoTConsensusIndex) consensusIndex)
+ .peerId2SearchIndex.forEach(
+ (thatK, thatV) ->
+ this.peerId2SearchIndex.compute(
+ thatK, (thisK, thisV) -> (thisV == null ? thatV :
Math.max(thisV, thatV))));
+ return this;
+ }
+
+ public static IoTConsensusIndex deserializeFrom(ByteBuffer byteBuffer) {
+ // TODO: impl it
+ return new IoTConsensusIndex();
+ }
+
+ public static IoTConsensusIndex deserializeFrom(InputStream stream) {
+ // TODO: impl it
+ return new IoTConsensusIndex();
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumConsensusIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumConsensusIndex.java
new file mode 100644
index 00000000000..03ba2ad2498
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumConsensusIndex.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class MinimumConsensusIndex implements ConsensusIndex {
+ public MinimumConsensusIndex() {}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ConsensusIndexType.MINIMUM_CONSENSUS_INDEX.serialize(byteBuffer);
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ ConsensusIndexType.MINIMUM_CONSENSUS_INDEX.serialize(stream);
+ }
+
+ @Override
+ public boolean isAfter(ConsensusIndex consensusIndex) {
+ return false;
+ }
+
+ @Override
+ public boolean equals(ConsensusIndex consensusIndex) {
+ return consensusIndex instanceof MinimumConsensusIndex;
+ }
+
+ @Override
+ public ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex) {
+ return consensusIndex == null ? this : consensusIndex;
+ }
+
+ public static MinimumConsensusIndex deserializeFrom(ByteBuffer byteBuffer) {
+ return new MinimumConsensusIndex();
+ }
+
+ public static MinimumConsensusIndex deserializeFrom(InputStream stream) {
+ return new MinimumConsensusIndex();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ return obj != null && getClass() == obj.getClass();
+ }
+
+ @Override
+ public String toString() {
+ return "MinimumConsensusIndex{}";
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index bb5149fb1f2..a66254b2f3a 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.pipe.task.meta;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -34,28 +36,25 @@ import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class PipeTaskMeta {
- // TODO: replace it with consensus index
- private final AtomicLong progressIndex = new AtomicLong(0L);
- private final AtomicInteger regionLeader = new AtomicInteger(0);
+ private final AtomicReference<ConsensusIndex> progressIndex = new
AtomicReference<>();
+ private final AtomicInteger dataNodeId = new AtomicInteger(0);
private final Queue<PipeRuntimeException> exceptionMessages = new
ConcurrentLinkedQueue<>();
- private PipeTaskMeta() {}
-
- public PipeTaskMeta(long progressIndex, int regionLeader) {
+ public PipeTaskMeta(ConsensusIndex progressIndex, int dataNodeId) {
this.progressIndex.set(progressIndex);
- this.regionLeader.set(regionLeader);
+ this.dataNodeId.set(dataNodeId);
}
- public long getProgressIndex() {
+ public ConsensusIndex getProgressIndex() {
return progressIndex.get();
}
- public int getRegionLeader() {
- return regionLeader.get();
+ public int getDataNodeId() {
+ return dataNodeId.get();
}
public Iterable<PipeRuntimeException> getExceptionMessages() {
@@ -70,17 +69,17 @@ public class PipeTaskMeta {
exceptionMessages.clear();
}
- public void setProgressIndex(long progressIndex) {
- this.progressIndex.set(progressIndex);
+ public void updateProgressIndex(ConsensusIndex updateIndex) {
+ progressIndex.updateAndGet(index -> index.updateToMaximum(updateIndex));
}
- public void setRegionLeader(int regionLeader) {
- this.regionLeader.set(regionLeader);
+ public void setDataNodeId(int dataNodeId) {
+ this.dataNodeId.set(dataNodeId);
}
public void serialize(DataOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(progressIndex.get(), outputStream);
- ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+ progressIndex.get().serialize(outputStream);
+ ReadWriteIOUtils.write(dataNodeId.get(), outputStream);
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
ReadWriteIOUtils.write(
@@ -90,8 +89,8 @@ public class PipeTaskMeta {
}
public void serialize(FileOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(progressIndex.get(), outputStream);
- ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+ progressIndex.get().serialize(outputStream);
+ ReadWriteIOUtils.write(dataNodeId.get(), outputStream);
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
ReadWriteIOUtils.write(
@@ -101,9 +100,9 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
- final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
- PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(byteBuffer));
- PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(byteBuffer));
+ final PipeTaskMeta PipeTaskMeta =
+ new PipeTaskMeta(
+ ConsensusIndexType.deserializeFrom(byteBuffer),
ReadWriteIOUtils.readInt(byteBuffer));
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final boolean critical = ReadWriteIOUtils.readBool(byteBuffer);
@@ -117,9 +116,9 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(InputStream inputStream) throws
IOException {
- final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
- PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(inputStream));
- PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(inputStream));
+ final PipeTaskMeta PipeTaskMeta =
+ new PipeTaskMeta(
+ ConsensusIndexType.deserializeFrom(inputStream),
ReadWriteIOUtils.readInt(inputStream));
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final boolean critical = ReadWriteIOUtils.readBool(inputStream);
@@ -141,14 +140,14 @@ public class PipeTaskMeta {
return false;
}
PipeTaskMeta that = (PipeTaskMeta) obj;
- return progressIndex.get() == that.progressIndex.get()
- && regionLeader.get() == that.regionLeader.get()
+ return progressIndex.get().equals(that.progressIndex.get())
+ && dataNodeId.get() == that.dataNodeId.get()
&& Arrays.equals(exceptionMessages.toArray(),
that.exceptionMessages.toArray());
}
@Override
public int hashCode() {
- return Objects.hash(progressIndex, regionLeader, exceptionMessages);
+ return Objects.hash(progressIndex, dataNodeId, exceptionMessages);
}
@Override
@@ -157,8 +156,8 @@ public class PipeTaskMeta {
+ "progressIndex='"
+ progressIndex
+ '\''
- + ", regionLeader='"
- + regionLeader
+ + ", dataNodeId='"
+ + dataNodeId
+ '\''
+ ", exceptionMessages="
+ exceptionMessages
diff --git
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
index 08e7f86007c..2ec09c7fe1f 100644
---
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
+++
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.task.meta;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
import org.junit.Assert;
import org.junit.Test;
@@ -59,10 +60,12 @@ public class PipeMetaDeSerTest {
{
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
- new PipeTaskMeta(789, 987));
+ new PipeTaskMeta(
+ new MinimumConsensusIndex(), 987)); // TODO: replace
with IoTConsensusIndex;
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
- new PipeTaskMeta(456, 789));
+ new PipeTaskMeta(
+ new MinimumConsensusIndex(), 789)); // TODO: replace
with IoTConsensusIndex;
}
});
ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 55f06629258..6cce0662917 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -297,6 +297,7 @@ public class TsFileProcessor {
tsFileResource.updateEndTime(
insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
}
+ tsFileResource.updateConsensusIndex(insertRowNode.getConsensusIndex());
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime()
- startTime);
}
@@ -406,6 +407,7 @@ public class TsFileProcessor {
tsFileResource.updateEndTime(
insertTabletNode.getDeviceID().toStringID(),
insertTabletNode.getTimes()[end - 1]);
}
+ tsFileResource.updateConsensusIndex(insertTabletNode.getConsensusIndex());
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime()
- startTime);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 0e5949f41b4..5b1a3de846a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -153,6 +155,8 @@ public class TsFileResource {
*/
private TsFileResource originTsFileResource;
+ private ConsensusIndex maxConsensusIndex;
+
public TsFileResource() {}
public TsFileResource(TsFileResource other) throws IOException {
@@ -170,6 +174,7 @@ public class TsFileResource {
this.minPlanIndex = other.minPlanIndex;
this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
this.tsFileSize = other.tsFileSize;
+ this.maxConsensusIndex = other.maxConsensusIndex;
}
/** for sealed TsFile, call setClosed to close TsFileResource */
@@ -243,6 +248,9 @@ public class TsFileResource {
String modFileName = new File(modFile.getFilePath()).getName();
ReadWriteIOUtils.write(modFileName, outputStream);
}
+ if (maxConsensusIndex != null) {
+ maxConsensusIndex.serialize(outputStream);
+ }
}
File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
@@ -265,6 +273,9 @@ public class TsFileResource {
modFile = new ModificationFile(modF.getPath());
}
}
+ if (inputStream.available() > 0) {
+ maxConsensusIndex = ConsensusIndexType.deserializeFrom(inputStream);
+ }
}
// upgrade from v0.12 to v0.13, we need to rewrite the TsFileResource if
the previous time index
@@ -310,6 +321,9 @@ public class TsFileResource {
modFile = new ModificationFile(modF.getPath());
}
}
+ if (inputStream.available() > 0) {
+ maxConsensusIndex = ConsensusIndexType.deserializeFrom(inputStream);
+ }
}
}
@@ -1122,4 +1136,23 @@ public class TsFileResource {
public boolean isFileInList() {
return prev != null || next != null;
}
+
+ public void updateConsensusIndex(ConsensusIndex consensusIndex) {
+ if (consensusIndex == null) {
+ return;
+ }
+
+ maxConsensusIndex =
+ (maxConsensusIndex == null
+ ? consensusIndex
+ : maxConsensusIndex.updateToMaximum(consensusIndex));
+ }
+
+ public ConsensusIndex getMaxConsensusIndexAfterClose() throws
IllegalStateException {
+ if (status.equals(TsFileResourceStatus.UNCLOSED)) {
+ throw new IllegalStateException(
+ "Should not get consensus index from a unclosing TsFileResource.");
+ }
+ return maxConsensusIndex;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 5a4e198b77c..e4b9817da9d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -263,4 +264,10 @@ public class InsertMultiTabletsNode extends InsertNode {
public long getMinTime() {
throw new NotImplementedException();
}
+
+ @Override
+ public void setConsensusIndex(ConsensusIndex consensusIndex) {
+ this.consensusIndex = consensusIndex;
+ insertTabletNodeList.forEach(node ->
node.setConsensusIndex(consensusIndex));
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index c191f03837a..c60057e7194 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
@@ -40,7 +42,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-public abstract class InsertNode extends WritePlanNode {
+public abstract class InsertNode extends WritePlanNode implements
ComparableConsensusRequest {
/** this insert node doesn't need to participate in iot consensus */
public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
@@ -73,6 +75,8 @@ public abstract class InsertNode extends WritePlanNode {
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
+ protected ConsensusIndex consensusIndex;
+
protected InsertNode(PlanNodeId id) {
super(id);
}
@@ -267,6 +271,20 @@ public abstract class InsertNode extends WritePlanNode {
}
// endregion
+ // region consensus index
+
+ @Override
+ public ConsensusIndex getConsensusIndex() {
+ return consensusIndex;
+ }
+
+ @Override
+ public void setConsensusIndex(ConsensusIndex consensusIndex) {
+ this.consensusIndex = consensusIndex;
+ }
+
+ // endregion
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 050839f6428..dcfe98e72ae 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -240,4 +241,10 @@ public class InsertRowsNode extends InsertNode {
public long getMinTime() {
throw new NotImplementedException();
}
+
+ @Override
+ public void setConsensusIndex(ConsensusIndex consensusIndex) {
+ this.consensusIndex = consensusIndex;
+ insertRowNodeList.forEach(insertRowNode ->
setConsensusIndex(consensusIndex));
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index dd788ea4db0..45d28ab5005 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
@@ -292,4 +293,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
public long getMinTime() {
throw new NotImplementedException();
}
+
+ @Override
+ public void setConsensusIndex(ConsensusIndex consensusIndex) {
+ this.consensusIndex = consensusIndex;
+ insertRowNodeList.forEach(insertRowNode ->
insertRowNode.setConsensusIndex(consensusIndex));
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 2ff33b450c7..9206d5cd474 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.task.PipeBuilder;
import org.apache.iotdb.db.pipe.task.PipeTask;
@@ -65,6 +67,7 @@ import java.util.stream.Collectors;
public class PipeTaskAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskAgent.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final PipeMetaKeeper pipeMetaKeeper;
private final PipeTaskManager pipeTaskManager;
@@ -156,11 +159,7 @@ public class PipeTaskAgent {
// if task meta does not exist on data node, create a new task
if (taskMetaOnDataNode == null) {
- createPipeTask(
- consensusGroupIdFromConfigNode,
- pipeStaticMeta,
- taskMetaFromConfigNode.getProgressIndex(),
- taskMetaFromConfigNode.getRegionLeader());
+ createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta,
taskMetaFromConfigNode);
// we keep the new created task's status consistent with the status
recorded in data node's
// pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
// is not reliable, but we will have a check later to make sure the
status is correct.
@@ -171,16 +170,12 @@ public class PipeTaskAgent {
}
// if task meta exists on data node, check if it has changed
- final int regionLeaderFromConfigNode =
taskMetaFromConfigNode.getRegionLeader();
- final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
+ final int dataNodeIdFromConfigNode =
taskMetaFromConfigNode.getDataNodeId();
+ final int dataNodeIdOnDataNode = taskMetaOnDataNode.getDataNodeId();
- if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
+ if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
- createPipeTask(
- consensusGroupIdFromConfigNode,
- pipeStaticMeta,
- taskMetaFromConfigNode.getProgressIndex(),
- taskMetaFromConfigNode.getRegionLeader());
+ createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta,
taskMetaFromConfigNode);
// we keep the new created task's status consistent with the status
recorded in data node's
// pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
// is not reliable, but we will have a check later to make sure the
status is correct.
@@ -511,19 +506,22 @@ public class PipeTaskAgent {
///////////////////////// Manage by dataRegionGroupId
/////////////////////////
private void createPipeTask(
- TConsensusGroupId dataRegionGroupId,
+ TConsensusGroupId consensusGroupId,
PipeStaticMeta pipeStaticMeta,
- long progressIndex,
- int dataRegionId) {
- final PipeTask pipeTask =
- new PipeTaskBuilder(Integer.toString(dataRegionId),
pipeStaticMeta).build();
- pipeTask.create();
- pipeTaskManager.addPipeTask(pipeStaticMeta, dataRegionGroupId, pipeTask);
+ PipeTaskMeta pipeTaskMeta) {
+ if (pipeTaskMeta.getDataNodeId() == CONFIG.getDataNodeId()) {
+ final PipeTask pipeTask =
+ new PipeTaskBuilder(consensusGroupId, pipeTaskMeta,
pipeStaticMeta).build();
+ pipeTask.create();
+ pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
+ }
pipeMetaKeeper
.getPipeMeta(pipeStaticMeta.getPipeName())
.getRuntimeMeta()
.getConsensusGroupIdToTaskMetaMap()
- .put(dataRegionGroupId, new PipeTaskMeta(progressIndex, dataRegionId));
+ .put(
+ consensusGroupId,
+ new PipeTaskMeta(pipeTaskMeta.getProgressIndex(),
pipeTaskMeta.getDataNodeId()));
}
private void dropPipeTask(TConsensusGroupId dataRegionGroupId,
PipeStaticMeta pipeStaticMeta) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index fe1c92638be..fa7b07c182b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.core.collector;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
@@ -44,10 +45,12 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
private int dataRegionId;
- public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event>
collectorPendingQueue) {
+ public IoTDBDataRegionCollector(
+ PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event>
collectorPendingQueue) {
hasBeenStarted = new AtomicBoolean(false);
- realtimeCollector = new
PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
- historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
+ realtimeCollector =
+ new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta,
collectorPendingQueue);
+ historicalCollector = new
PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index ed0d37c6c14..1235f010b8b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.core.collector.historical;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
@@ -37,10 +39,17 @@ import java.util.stream.Collectors;
public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
+ private final PipeTaskMeta pipeTaskMeta;
+ private final ConsensusIndex startIndex;
private int dataRegionId;
private Queue<PipeTsFileInsertionEvent> pendingQueue;
+ public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
+ this.pipeTaskMeta = pipeTaskMeta;
+ this.startIndex = pipeTaskMeta.getProgressIndex();
+ }
+
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
@@ -71,16 +80,25 @@ public class PipeHistoricalDataRegionTsFileCollector
implements PipeCollector {
pendingQueue = new ArrayDeque<>(tsFileManager.size(true) +
tsFileManager.size(false));
pendingQueue.addAll(
tsFileManager.getTsFileList(true).stream()
+ .filter(
+ resource ->
+ resource.getMaxConsensusIndexAfterClose() == null
+ ||
!startIndex.isAfter(resource.getMaxConsensusIndexAfterClose()))
.map(PipeTsFileInsertionEvent::new)
.collect(Collectors.toList()));
pendingQueue.addAll(
tsFileManager.getTsFileList(false).stream()
+ .filter(
+ resource ->
+ resource.getMaxConsensusIndexAfterClose() == null
+ ||
!startIndex.isAfter(resource.getMaxConsensusIndexAfterClose()))
.map(PipeTsFileInsertionEvent::new)
.collect(Collectors.toList()));
pendingQueue.forEach(
- event ->
- event.increaseReferenceCount(
- PipeHistoricalDataRegionTsFileCollector.class.getName()));
+ event -> {
+ event.reportProgressIndexToPipeTaskMetaWhenFinish(pipeTaskMeta);
+
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileCollector.class.getName());
+ });
} finally {
tsFileManager.readUnlock();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 41e9f5d76fb..b84f86040fb 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -31,6 +32,11 @@ public abstract class PipeRealtimeDataRegionCollector
implements PipeCollector {
protected String pattern;
protected String dataRegionId;
+ protected final PipeTaskMeta pipeTaskMeta;
+
+ public PipeRealtimeDataRegionCollector(PipeTaskMeta pipeTaskMeta) {
+ this.pipeTaskMeta = pipeTaskMeta;
+ }
@Override
public void validate(PipeParameterValidator validator) throws Exception {
@@ -64,6 +70,10 @@ public abstract class PipeRealtimeDataRegionCollector
implements PipeCollector {
return pattern;
}
+ public final PipeTaskMeta getPipeTaskMeta() {
+ return pipeTaskMeta;
+ }
+
@Override
public String toString() {
return "PipeRealtimeDataRegionCollector{"
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 898b93cb22d..4ec7ed92261 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -44,7 +45,8 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
private final ListenableUnblockingPendingQueue<Event> pendingQueue;
public PipeRealtimeDataRegionHybridCollector(
- ListenableUnblockingPendingQueue<Event> pendingQueue) {
+ PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event>
pendingQueue) {
+ super(pipeTaskMeta);
this.pendingQueue = pendingQueue;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 2d805310f30..49509f0b961 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -53,8 +53,10 @@ public class PipeDataRegionAssigner {
.match(event)
.forEach(
collector -> {
-
event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
- collector.collect(event);
+ PipeRealtimeCollectEvent copiedEvent = event.shallowCopySelf();
+
copiedEvent.reportProgressIndexToPipeTaskMetaWhenFinish(collector.getPipeTaskMeta());
+
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+ collector.collect(copiedEvent);
});
event.gcSchemaInfo();
event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index fa9e765ca25..7ffa1caf4ef 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -19,11 +19,34 @@
package org.apache.iotdb.db.pipe.core.event;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* EnrichedEvent is an event that can be enriched with additional runtime
information. The
* additional information mainly includes the reference count of the event.
*/
-public interface EnrichedEvent {
+public abstract class EnrichedEvent implements Event {
+ private final AtomicInteger referenceCount = new AtomicInteger(0);
+ private PipeTaskMeta pipeTaskMeta;
+
+ public EnrichedEvent() {}
+
+ public boolean increaseReferenceCount(String holderMessage) {
+ AtomicBoolean success = new AtomicBoolean(true);
+ referenceCount.getAndUpdate(
+ count -> {
+ if (count == 0) {
+ success.set(increaseResourceReferenceCount(holderMessage));
+ }
+ return count + 1;
+ });
+ return success.get();
+ }
/**
* Increase the reference count of this event.
@@ -32,7 +55,20 @@ public interface EnrichedEvent {
* @return true if the reference count is increased successfully, false if
the event is not
* controlled by the invoker, which means the data stored in the event
is not safe to use
*/
- boolean increaseReferenceCount(String holderMessage);
+ public abstract boolean increaseResourceReferenceCount(String holderMessage);
+
+ public boolean decreaseReferenceCount(String holderMessage) {
+ AtomicBoolean success = new AtomicBoolean(true);
+ referenceCount.getAndUpdate(
+ count -> {
+ if (count == 1) {
+ success.set(decreaseResourceReferenceCount(holderMessage));
+ reportProgress();
+ }
+ return count - 1;
+ });
+ return success.get();
+ }
/**
* Decrease the reference count of this event. If the reference count is
decreased to 0, the event
@@ -41,14 +77,28 @@ public interface EnrichedEvent {
* @param holderMessage the message of the invoker
* @return true if the reference count is decreased successfully, false
otherwise
*/
- boolean decreaseReferenceCount(String holderMessage);
+ public abstract boolean decreaseResourceReferenceCount(String holderMessage);
+
+ private void reportProgress() {
+ if (pipeTaskMeta != null) {
+ pipeTaskMeta.updateProgressIndex(getConsensusIndex());
+ }
+ }
/**
* Get the reference count of this event.
*
* @return the reference count
*/
- int getReferenceCount();
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
+
+ public abstract ConsensusIndex getConsensusIndex();
+
+ public void reportProgressIndexToPipeTaskMetaWhenFinish(PipeTaskMeta
pipeTaskMeta) {
+ this.pipeTaskMeta = pipeTaskMeta;
+ }
- // TODO: ConsensusIndex getConsensusIndex();
+ public abstract EnrichedEvent shallowCopySelf();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 62c599b66c1..b6018b2cc6f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.event.impl;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.access.Row;
@@ -30,7 +31,7 @@ import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
-public class PipeTabletInsertionEvent implements TabletInsertionEvent,
EnrichedEvent {
+public class PipeTabletInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
private final InsertNode insertNode;
@@ -61,23 +62,27 @@ public class PipeTabletInsertionEvent implements
TabletInsertionEvent, EnrichedE
}
@Override
- public boolean increaseReferenceCount(String holderMessage) {
+ public boolean increaseResourceReferenceCount(String holderMessage) {
// TODO: use WALPipeHandler pinMemtable
referenceCount.incrementAndGet();
return true;
}
@Override
- public boolean decreaseReferenceCount(String holderMessage) {
+ public boolean decreaseResourceReferenceCount(String holderMessage) {
// TODO: use WALPipeHandler unpinMemetable
referenceCount.decrementAndGet();
return true;
}
@Override
- public int getReferenceCount() {
- // TODO: use WALPipeHandler unpinMemetable
- return referenceCount.get();
+ public ConsensusIndex getConsensusIndex() {
+ return insertNode.getConsensusIndex();
+ }
+
+ @Override
+ public PipeTabletInsertionEvent shallowCopySelf() {
+ return new PipeTabletInsertionEvent(this.insertNode);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 65f78cebac2..93e4232d0b4 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.core.event.impl;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
@@ -32,14 +34,16 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent,
EnrichedEvent {
+public class PipeTsFileInsertionEvent extends EnrichedEvent implements
TsFileInsertionEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
+ private final TsFileResource resource;
private File tsFile;
private final AtomicBoolean isClosed;
public PipeTsFileInsertionEvent(TsFileResource resource) {
+ this.resource = resource;
tsFile = resource.getTsFile();
isClosed = new AtomicBoolean(resource.isClosed());
@@ -83,7 +87,7 @@ public class PipeTsFileInsertionEvent implements
TsFileInsertionEvent, EnrichedE
}
@Override
- public boolean increaseReferenceCount(String holderMessage) {
+ public boolean increaseResourceReferenceCount(String holderMessage) {
try {
// TODO: increase reference count for mods & resource files
tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
@@ -99,7 +103,7 @@ public class PipeTsFileInsertionEvent implements
TsFileInsertionEvent, EnrichedE
}
@Override
- public boolean decreaseReferenceCount(String holderMessage) {
+ public boolean decreaseResourceReferenceCount(String holderMessage) {
try {
PipeResourceManager.file().decreaseFileReference(tsFile);
return true;
@@ -114,8 +118,21 @@ public class PipeTsFileInsertionEvent implements
TsFileInsertionEvent, EnrichedE
}
@Override
- public int getReferenceCount() {
- return PipeResourceManager.file().getFileReferenceCount(tsFile);
+ public ConsensusIndex getConsensusIndex() {
+ try {
+ waitForTsFileClose();
+ return resource.getMaxConsensusIndexAfterClose();
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ String.format(
+ "Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath()));
+ return new MinimumConsensusIndex();
+ }
+ }
+
+ @Override
+ public PipeTsFileInsertionEvent shallowCopySelf() {
+ return new PipeTsFileInsertionEvent(this.resource);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 0c63131e6e9..09a1ff0bdd1 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -19,20 +19,21 @@
package org.apache.iotdb.db.pipe.core.event.realtime;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
import java.util.Map;
-public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
+public class PipeRealtimeCollectEvent extends EnrichedEvent {
- private final Event event;
+ private final EnrichedEvent event;
private final TsFileEpoch tsFileEpoch;
private Map<String, String[]> device2Measurements;
public PipeRealtimeCollectEvent(
- Event event, TsFileEpoch tsFileEpoch, Map<String, String[]>
device2Measurements) {
+ EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]>
device2Measurements) {
this.event = event;
this.tsFileEpoch = tsFileEpoch;
this.device2Measurements = device2Measurements;
@@ -55,20 +56,24 @@ public class PipeRealtimeCollectEvent implements Event,
EnrichedEvent {
}
@Override
- public boolean increaseReferenceCount(String holderMessage) {
- return !(event instanceof EnrichedEvent)
- || ((EnrichedEvent) event).increaseReferenceCount(holderMessage);
+ public boolean increaseResourceReferenceCount(String holderMessage) {
+ return event.increaseResourceReferenceCount(holderMessage);
}
@Override
- public boolean decreaseReferenceCount(String holderMessage) {
- return !(event instanceof EnrichedEvent)
- || ((EnrichedEvent) event).decreaseReferenceCount(holderMessage);
+ public boolean decreaseResourceReferenceCount(String holderMessage) {
+ return event.decreaseResourceReferenceCount(holderMessage);
}
@Override
- public int getReferenceCount() {
- return event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getReferenceCount() : 0;
+ public ConsensusIndex getConsensusIndex() {
+ return event.getConsensusIndex();
+ }
+
+ @Override
+ public PipeRealtimeCollectEvent shallowCopySelf() {
+ return new PipeRealtimeCollectEvent(
+ event.shallowCopySelf(), this.tsFileEpoch, this.device2Measurements);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index 447ae5bdf09..d9dbe4dc1f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import java.util.HashMap;
import java.util.Map;
public class PipeBuilder {
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final PipeMeta pipeMeta;
@@ -49,16 +52,18 @@ public class PipeBuilder {
final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
for (Map.Entry<TConsensusGroupId, PipeTaskMeta>
consensusGroupIdToPipeTaskMeta :
pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
- consensusGroupIdToPipeTaskMap.put(
- consensusGroupIdToPipeTaskMeta.getKey(),
- new PipeTaskBuilder(
- pipeName,
-
Integer.toString(consensusGroupIdToPipeTaskMeta.getKey().getId()),
- // TODO:
consensusGroupIdToPipeTaskMeta.getValue().getProgressIndex() is not used
- collectorParameters,
- processorParameters,
- connectorParameters)
- .build());
+ if (consensusGroupIdToPipeTaskMeta.getValue().getDataNodeId() ==
CONFIG.getDataNodeId()) {
+ consensusGroupIdToPipeTaskMap.put(
+ consensusGroupIdToPipeTaskMeta.getKey(),
+ new PipeTaskBuilder(
+ pipeName,
+ consensusGroupIdToPipeTaskMeta.getKey(),
+ consensusGroupIdToPipeTaskMeta.getValue(),
+ collectorParameters,
+ processorParameters,
+ connectorParameters)
+ .build());
+ }
}
return consensusGroupIdToPipeTaskMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index 26475c5c4b4..887b31a6e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -19,12 +19,13 @@
package org.apache.iotdb.db.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
public class PipeTask {
private final String pipeName;
- private final String dataRegionId;
+ private final TConsensusGroupId dataRegionId;
private final PipeTaskStage collectorStage;
private final PipeTaskStage processorStage;
@@ -32,7 +33,7 @@ public class PipeTask {
PipeTask(
String pipeName,
- String dataRegionId,
+ TConsensusGroupId dataRegionId,
PipeTaskStage collectorStage,
PipeTaskStage processorStage,
PipeTaskStage connectorStage) {
@@ -68,7 +69,7 @@ public class PipeTask {
connectorStage.stop();
}
- public String getDataRegionId() {
+ public TConsensusGroupId getDataRegionId() {
return dataRegionId;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 51e429b0455..d42d62e5f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
@@ -28,28 +30,33 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
public class PipeTaskBuilder {
private final String pipeName;
- private final String dataRegionId;
+ private final TConsensusGroupId dataRegionId;
+ private final PipeTaskMeta pipeTaskMeta;
private final PipeParameters pipeCollectorParameters;
private final PipeParameters pipeProcessorParameters;
private final PipeParameters pipeConnectorParameters;
PipeTaskBuilder(
String pipeName,
- String dataRegionId,
+ TConsensusGroupId dataRegionId,
+ PipeTaskMeta pipeTaskMeta,
PipeParameters pipeCollectorParameters,
PipeParameters pipeProcessorParameters,
PipeParameters pipeConnectorParameters) {
this.pipeName = pipeName;
this.dataRegionId = dataRegionId;
+ this.pipeTaskMeta = pipeTaskMeta;
this.pipeCollectorParameters = pipeCollectorParameters;
this.pipeProcessorParameters = pipeProcessorParameters;
this.pipeConnectorParameters = pipeConnectorParameters;
}
- public PipeTaskBuilder(String dataRegionId, PipeStaticMeta pipeStaticMeta) {
+ public PipeTaskBuilder(
+ TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta,
PipeStaticMeta pipeStaticMeta) {
this(
pipeStaticMeta.getPipeName(),
dataRegionId,
+ pipeTaskMeta,
pipeStaticMeta.getCollectorParameters(),
pipeStaticMeta.getProcessorParameters(),
pipeStaticMeta.getConnectorParameters());
@@ -60,7 +67,7 @@ public class PipeTaskBuilder {
// we first build the collector and connector, then build the processor.
final PipeTaskCollectorStage collectorStage =
- new PipeTaskCollectorStage(dataRegionId, pipeCollectorParameters);
+ new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta,
pipeCollectorParameters);
final PipeTaskConnectorStage connectorStage =
new PipeTaskConnectorStage(pipeConnectorParameters);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index a4866e87e8e..3dd445c7489 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
@@ -53,7 +55,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
private final PipeCollector pipeCollector;
- public PipeTaskCollectorStage(String dataRegionId, PipeParameters
collectorParameters) {
+ public PipeTaskCollectorStage(
+ TConsensusGroupId dataRegionId,
+ PipeTaskMeta pipeTaskMeta,
+ PipeParameters collectorParameters) {
// TODO: avoid if-else, use reflection to create collector all the time
if (collectorParameters
.getStringOrDefault(
@@ -69,10 +74,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
// collector
this.collectorParameters
.getAttribute()
- .put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
+ .put(PipeCollectorConstant.DATA_REGION_KEY,
String.valueOf(dataRegionId.getId()));
collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
- this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
+ this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta,
collectorPendingQueue);
} else {
this.collectorParameters = collectorParameters;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 6bbec4480c0..5542b943d59 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
@@ -60,7 +61,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
*/
public PipeTaskProcessorStage(
String pipeName,
- String dataRegionId,
+ TConsensusGroupId dataRegionId,
EventSupplier pipeCollectorInputEventSupplier,
@Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
PipeParameters pipeProcessorParameters,
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceConsensusIndexTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceConsensusIndexTest.java
new file mode 100644
index 00000000000..f1a676b9c24
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceConsensusIndexTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+public class TsFileResourceConsensusIndexTest {
+ private final File file =
+ new File(
+
TsFileNameGenerator.generateNewTsFilePath(TestConstant.BASE_OUTPUT_PATH, 1, 1,
1, 1));
+ private final TsFileResource tsFileResource = new TsFileResource(file);
+ private final Map<String, Integer> deviceToIndex = new HashMap<>();
+ private final long[] startTimes = new long[DEVICE_NUM];
+ private final long[] endTimes = new long[DEVICE_NUM];
+ private static final int DEVICE_NUM = 100;
+
+ private final List<ConsensusIndex> indexList = new ArrayList<>();
+ private static final int INDEX_NUM = 1000;
+
+ @Before
+ public void setUp() {
+ IntStream.range(0, DEVICE_NUM).forEach(i -> deviceToIndex.put("root.sg.d"
+ i, i));
+ DeviceTimeIndex deviceTimeIndex = new DeviceTimeIndex(deviceToIndex,
startTimes, endTimes);
+ IntStream.range(0, DEVICE_NUM)
+ .forEach(
+ i -> {
+ deviceTimeIndex.updateStartTime("root.sg.d" + i, i);
+ deviceTimeIndex.updateEndTime("root.sg.d" + i, i + 1);
+ });
+ tsFileResource.setTimeIndex(deviceTimeIndex);
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+ IntStream.range(0, INDEX_NUM).forEach(i -> indexList.add(new
MockConsensusIndex(i)));
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ // clean fake file
+ if (file.exists()) {
+ FileUtils.delete(file);
+ }
+ File resourceFile = new File(file.getName() +
TsFileResource.RESOURCE_SUFFIX);
+ if (resourceFile.exists()) {
+ FileUtils.delete(resourceFile);
+ }
+ }
+
+ @Test
+ public void testConsensusIndexRecorder() {
+ Assert.assertTrue(
+ new
MockConsensusIndex(0).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+
+ indexList.forEach(tsFileResource::updateConsensusIndex);
+
+ Assert.assertFalse(
+ new
MockConsensusIndex(-1).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+ Assert.assertFalse(
+ new
MockConsensusIndex(0).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+ Assert.assertFalse(
+ new
MockConsensusIndex(1).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+ Assert.assertFalse(
+ new MockConsensusIndex(INDEX_NUM - 1)
+ .isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+
+ Assert.assertTrue(
+ new
MockConsensusIndex(INDEX_NUM).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+ Assert.assertTrue(
+ new MockConsensusIndex(Integer.MAX_VALUE)
+ .isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+
+ Assert.assertFalse(
+ new MockConsensusIndex(1, INDEX_NUM - 1)
+ .isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+ }
+
+ @Test
+ public void testConsensusIndexRecorderSerialize() {
+ // TODO: wait for implements of ConsensusIndex.deserializeFrom
+ }
+
+ public static class MockConsensusIndex implements ConsensusIndex {
+ private final int type;
+ private int val;
+
+ public MockConsensusIndex(int val) {
+ this(0, val);
+ }
+
+ public MockConsensusIndex(int type, int val) {
+ this.type = type;
+ this.val = val;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(val, byteBuffer);
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(val, stream);
+ }
+
+ @Override
+ public boolean isAfter(ConsensusIndex consensusIndex) {
+ if (!(consensusIndex instanceof MockConsensusIndex)) {
+ return true;
+ }
+
+ MockConsensusIndex that = (MockConsensusIndex) consensusIndex;
+ return this.type == that.type && this.val > that.val;
+ }
+
+ @Override
+ public boolean equals(ConsensusIndex consensusIndex) {
+ if (!(consensusIndex instanceof MockConsensusIndex)) {
+ return false;
+ }
+
+ MockConsensusIndex that = (MockConsensusIndex) consensusIndex;
+ return this.type == that.type && this.val == that.val;
+ }
+
+ @Override
+ public ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex) {
+ if (!(consensusIndex instanceof MockConsensusIndex)) {
+ throw new IllegalStateException("Mock update error.");
+ }
+
+ MockConsensusIndex that = (MockConsensusIndex) consensusIndex;
+ if (that.type == this.type) {
+ this.val = Math.max(this.val, that.val);
+ }
+ return this;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index d833e2761a0..198761eee3b 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
@@ -64,7 +65,8 @@ public class CachedSchemaPatternMatcherTest {
@Test
public void testCachedMatcher() throws ExecutionException,
InterruptedException {
- PipeRealtimeDataRegionCollector databaseCollector = new
PipeRealtimeDataRegionFakeCollector();
+ PipeRealtimeDataRegionCollector databaseCollector =
+ new PipeRealtimeDataRegionFakeCollector(null);
databaseCollector.customize(
new PipeParameters(
new HashMap<String, String>() {
@@ -79,7 +81,8 @@ public class CachedSchemaPatternMatcherTest {
int deviceCollectorNum = 10;
int seriesCollectorNum = 10;
for (int i = 0; i < deviceCollectorNum; i++) {
- PipeRealtimeDataRegionCollector deviceCollector = new
PipeRealtimeDataRegionFakeCollector();
+ PipeRealtimeDataRegionCollector deviceCollector =
+ new PipeRealtimeDataRegionFakeCollector(null);
int finalI1 = i;
deviceCollector.customize(
new PipeParameters(
@@ -92,7 +95,8 @@ public class CachedSchemaPatternMatcherTest {
null);
collectorList.add(deviceCollector);
for (int j = 0; j < seriesCollectorNum; j++) {
- PipeRealtimeDataRegionCollector seriesCollector = new
PipeRealtimeDataRegionFakeCollector();
+ PipeRealtimeDataRegionCollector seriesCollector =
+ new PipeRealtimeDataRegionFakeCollector(null);
int finalI = i;
int finalJ = j;
seriesCollector.customize(
@@ -149,6 +153,10 @@ public class CachedSchemaPatternMatcherTest {
public static class PipeRealtimeDataRegionFakeCollector extends
PipeRealtimeDataRegionCollector {
+ public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
+ super(pipeTaskMeta);
+ }
+
@Override
public Event supply() {
return null;
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 7cd705af588..09ee2a13a8b 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -99,13 +99,17 @@ public class PipeRealtimeCollectTest {
// set up realtime collector
try (PipeRealtimeDataRegionHybridCollector collector1 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector2 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector3 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector4 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>())) {
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>())) {
collector1.customize(
new PipeParameters(