This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d2c4408cd7 [IOTDB-5978] Pipe: ignore data in region "root.__system"
(#10093)
6d2c4408cd7 is described below
commit 6d2c4408cd79a81f239d5f42a0b502fb4bc763fe
Author: yschengzi <[email protected]>
AuthorDate: Fri Jun 9 13:13:41 2023 +0800
[IOTDB-5978] Pipe: ignore data in region "root.__system" (#10093)
---
.../manager/pipe/runtime/PipeRuntimeCoordinator.java | 8 +++++++-
.../procedure/impl/pipe/task/CreatePipeProcedureV2.java | 12 ++++++++++--
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 2992f6dd31e..1382b90851e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
import
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
+import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -70,7 +71,12 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
.getLeaderMap()
.forEach(
(regionId, pair) -> {
- if (regionId.getType().equals(TConsensusGroupType.DataRegion)) {
+ if (regionId.getType().equals(TConsensusGroupType.DataRegion)
+ && !configManager
+ .getPartitionManager()
+ .getRegionStorageGroup(regionId)
+ .equals(IoTDBMetricsUtils.DATABASE)) {
+ // pipe only collect user's data, filter metric database here.
dataRegionGroupToOldAndNewLeaderPairMap.put(
regionId,
new Pair<>( // null or -1 means empty origin leader
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 0ed89dfe536..f8abb067054 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -103,9 +104,16 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.getLoadManager()
.getRegionLeaderMap()
.forEach(
- (regionGroup, regionLeaderNodeId) ->
+ (regionGroup, regionLeaderNodeId) -> {
+ if (!env.getConfigManager()
+ .getPartitionManager()
+ .getRegionStorageGroup(regionGroup)
+ .equals(IoTDBMetricsUtils.DATABASE)) {
+ // pipe only collect user's data, filter metric database here.
consensusGroupIdToTaskMetaMap.put(
- regionGroup, new PipeTaskMeta(new MinimumProgressIndex(),
regionLeaderNodeId)));
+ regionGroup, new PipeTaskMeta(new MinimumProgressIndex(),
regionLeaderNodeId));
+ }
+ });
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
}