This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cee53408453048a4ccd73a1fd951e66dcb5b7702 Author: Caideyipi <[email protected]> AuthorDate: Mon Jun 22 09:37:45 2026 +0800 Fix old pipe root user compatibility (#17985) * Fix old pipe root user compatibility * Fix ConfigurationFileUtilsTest import * Update AbstractEnv.java * Address pipe compatibility review comments --- .../iotdb/confignode/i18n/ConfigNodeMessages.java | 3 + .../iotdb/confignode/i18n/ConfigNodeMessages.java | 3 + .../iotdb/confignode/manager/ConfigManager.java | 4 +- .../persistence/executor/ConfigPlanExecutor.java | 1 + .../confignode/persistence/pipe/PipeInfo.java | 8 +- .../confignode/persistence/pipe/PipeTaskInfo.java | 55 ++++++- .../pipe/PipeTaskInfoAutoRestartTest.java | 169 ++++++++++++++++++++- .../pipe/agent/task/meta/PipeStaticMeta.java | 60 ++++++++ 8 files changed, 292 insertions(+), 11 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index c1c3e877de7..e254da50213 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -216,6 +216,9 @@ public final class ConfigNodeMessages { "Failed to drop trigger [%s], this trigger has not been created"; public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED = "Failed to drop UDF [%s], this UDF has not been created"; + public static final String + FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST = + "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist."; public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE = "Failed to fetch schemaengine black list on DataNode {}, {}"; public static final String FAILED_TO_GET_FIELD = "Failed to get field {}"; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 6ca847626c8..abae5acc6cd 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -212,6 +212,9 @@ public final class ConfigNodeMessages { "Failed to drop trigger [%s], this trigger has not been created"; public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED = "Failed to drop UDF [%s], this UDF has not been created"; + public static final String + FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST = + "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist."; public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE = "Failed to fetch schemaengine black list on DataNode {}, {}"; public static final String FAILED_TO_GET_FIELD = "Failed to get field {}"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 3f60b84a74e..74340c97cbd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -365,7 +365,8 @@ public class ConfigManager implements IManager { TriggerInfo triggerInfo = new TriggerInfo(); CQInfo cqInfo = new CQInfo(); ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo(); - PipeInfo pipeInfo = new PipeInfo(); + this.permissionManager = createPermissionManager(authorInfo); + PipeInfo pipeInfo = new PipeInfo(userName -> this.permissionManager.login4Pipe(userName, null)); QuotaInfo quotaInfo = new QuotaInfo(); TTLInfo ttlInfo = new TTLInfo(); SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); @@ -399,7 +400,6 @@ public class ConfigManager implements IManager { new ClusterSchemaQuotaStatistics( COMMON_CONF.getSeriesLimitThreshold(), COMMON_CONF.getDeviceLimitThreshold())); this.partitionManager = new PartitionManager(this, partitionInfo); - this.permissionManager = createPermissionManager(authorInfo); this.procedureManager = createProcedureManager(procedureInfo); this.externalServiceManager = new ExternalServiceManager(this); this.udfManager = new UDFManager(this, udfInfo); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index e37c3ae9960..e2c78b8495d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -796,6 +796,7 @@ public class ConfigPlanExecutor { } }); if (result.get()) { + pipeInfo.getPipeTaskInfo().enrichPipeMetasWithRootUserForCompatibility(); LOGGER.info( ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR, latestSnapshotRootDir); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java index e353398cd9d..bef26386afd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.Function; public class PipeInfo implements SnapshotProcessor { @@ -58,8 +59,13 @@ public class PipeInfo implements SnapshotProcessor { private final PipeTaskInfo pipeTaskInfo; public PipeInfo() throws IOException { + this(null); + } + + public PipeInfo(final Function<String, String> pipeUserCurrentPasswordProvider) + throws IOException { pipePluginInfo = new PipePluginInfo(); - pipeTaskInfo = new PipeTaskInfo(); + pipeTaskInfo = new PipeTaskInfo(pipeUserCurrentPasswordProvider); } public PipePluginInfo getPipePluginInfo() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 3c2e331b0a1..247f803152b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.persistence.pipe; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; @@ -56,7 +57,6 @@ import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceM import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; -import org.apache.iotdb.confignode.service.ConfigNode; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -76,9 +76,11 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -95,10 +97,17 @@ public class PipeTaskInfo implements SnapshotProcessor { // Pure in-memory object, not involved in snapshot serialization and deserialization. private final PipeTaskInfoVersion pipeTaskInfoVersion; + // Accepts a username and returns its current stored password for pipe authentication. + private final Function<String, String> pipeUserCurrentPasswordProvider; public PipeTaskInfo() { + this(null); + } + + public PipeTaskInfo(final Function<String, String> pipeUserCurrentPasswordProvider) { this.pipeMetaKeeper = new PipeMetaKeeper(); this.pipeTaskInfoVersion = new PipeTaskInfoVersion(); + this.pipeUserCurrentPasswordProvider = pipeUserCurrentPasswordProvider; } /////////////////////////////// Lock /////////////////////////////// @@ -445,6 +454,7 @@ public class PipeTaskInfo implements SnapshotProcessor { public TSStatus createPipe(final CreatePipePlanV2 plan) { acquireWriteLock(); try { + enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta()); pipeMetaKeeper.addPipeMeta(new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta())); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } finally { @@ -502,6 +512,7 @@ public class PipeTaskInfo implements SnapshotProcessor { public TSStatus alterPipe(final AlterPipePlanV2 plan) { acquireWriteLock(); try { + enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta()); final PipeTemporaryMeta temporaryMeta = pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta(); pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName()); @@ -719,6 +730,7 @@ public class PipeTaskInfo implements SnapshotProcessor { plan.getPipeMetaList() .forEach( pipeMeta -> { + enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta()); pipeMetaKeeper.addPipeMeta(pipeMeta); logger.ifPresent(l -> l.debug(ConfigNodeMessages.RECORDING_PIPE_META, pipeMeta)); }); @@ -998,6 +1010,47 @@ public class PipeTaskInfo implements SnapshotProcessor { } } + public void enrichPipeMetasWithRootUserForCompatibility() { + acquireWriteLock(); + try { + pipeMetaKeeper + .getPipeMetaList() + .forEach( + pipeMeta -> enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta())); + } finally { + releaseWriteLock(); + } + } + + private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pipeStaticMeta) { + if (pipeUserCurrentPasswordProvider == null) { + return; + } + final boolean shouldEnrichSource = pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource(); + final boolean shouldEnrichSink = pipeStaticMeta.mayNeedCompatibleRootUserForWriteBackSink(); + if (!shouldEnrichSource && !shouldEnrichSink) { + return; + } + + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String password = pipeUserCurrentPasswordProvider.apply(rootUserName); + if (Objects.isNull(password)) { + throw new PipeException( + String.format( + ConfigNodeMessages + .FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST, + pipeStaticMeta.getPipeName(), + rootUserName)); + } + + if (shouldEnrichSource) { + pipeStaticMeta.enrichSourceWithRootUserForCompatibility(rootUserName, password); + } + if (shouldEnrichSink) { + pipeStaticMeta.enrichWriteBackSinkWithRootUserForCompatibility(rootUserName, password); + } + } + private void normalizeRecoveredConsensusPipeStatus() { final List<String> restartedConsensusPipes = new ArrayList<>(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java index 762c3bf045c..7b78f59253d 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java @@ -20,11 +20,15 @@ package org.apache.iotdb.confignode.persistence.pipe; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; @@ -88,6 +92,145 @@ public class PipeTaskInfoAutoRestartTest { Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); } + @Test + public void testEnrichOldUserPipeWithRootUserForCompatibility() { + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); + + createPipe("oldPipe", PipeStatus.STOPPED); + + final Map<String, String> sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("oldPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertEquals( + String.valueOf(IoTDBConstant.SUPER_USER_ID), + sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USER_ID)); + Assert.assertEquals( + rootUserName, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertEquals( + rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + Assert.assertFalse( + pipeTaskInfo + .getPipeMetaByPipeName("oldPipe") + .getStaticMeta() + .getSinkParameters() + .getAttribute() + .containsKey(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testDoNotOverwritePipeWithUserForCompatibility() { + pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password"); + + createPipeWithSourceAttributes( + "newPipe", + new HashMap<String, String>() { + { + put("extractor", "iotdb-source"); + put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user"); + put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "user-password"); + } + }); + + final Map<String, String> sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("newPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertEquals("user", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertEquals( + "user-password", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testDoNotEnrichSystemPipeForCompatibility() { + pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password"); + + createPipeWithSourceAttributes( + PipeStaticMeta.generateSubscriptionPipeName("topic", "group"), + new HashMap<String, String>() { + { + put("extractor", "iotdb-source"); + } + }); + + final Map<String, String> sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName(PipeStaticMeta.generateSubscriptionPipeName("topic", "group")) + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() { + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); + + createPipeWithAttributes( + "oldWriteBackPipe", + new HashMap<String, String>() { + { + put("extractor", "iotdb-source"); + put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "source-user"); + put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "source-password"); + } + }, + new HashMap<String, String>() { + { + put("connector", "write-back-sink"); + } + }); + + final Map<String, String> sinkAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("oldWriteBackPipe") + .getStaticMeta() + .getSinkParameters() + .getAttribute(); + Assert.assertEquals( + String.valueOf(IoTDBConstant.SUPER_USER_ID), + sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USER_ID)); + Assert.assertEquals(rootUserName, sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)); + Assert.assertEquals(rootPassword, sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() { + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); + + createPipeWithSourceAttributes( + "loadedPipe", + new HashMap<String, String>() { + { + put("extractor", "iotdb-source"); + } + }); + final Map<String, String> sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("loadedPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USER_ID); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + + pipeTaskInfo.enrichPipeMetasWithRootUserForCompatibility(); + + Assert.assertEquals( + rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + private Map<Integer, TPushPipeMetaResp> createErrorRespMap(final String pipeName) { final TPushPipeMetaRespExceptionMessage exceptionMessage = new TPushPipeMetaRespExceptionMessage( @@ -101,11 +244,27 @@ public class PipeTaskInfoAutoRestartTest { private void createPipe(final String pipeName, final PipeStatus initialStatus) { final Map<String, String> extractorAttributes = new HashMap<>(); - final Map<String, String> processorAttributes = new HashMap<>(); - final Map<String, String> connectorAttributes = new HashMap<>(); extractorAttributes.put("extractor", "iotdb-source"); - processorAttributes.put("processor", "do-nothing-processor"); + createPipeWithSourceAttributes(pipeName, extractorAttributes); + + if (PipeStatus.RUNNING.equals(initialStatus)) { + pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); + } + } + + private void createPipeWithSourceAttributes( + final String pipeName, final Map<String, String> extractorAttributes) { + final Map<String, String> connectorAttributes = new HashMap<>(); connectorAttributes.put("connector", "iotdb-thrift-sink"); + createPipeWithAttributes(pipeName, extractorAttributes, connectorAttributes); + } + + private void createPipeWithAttributes( + final String pipeName, + final Map<String, String> extractorAttributes, + final Map<String, String> connectorAttributes) { + final Map<String, String> processorAttributes = new HashMap<>(); + processorAttributes.put("processor", "do-nothing-processor"); final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); @@ -120,9 +279,5 @@ public class PipeTaskInfoAutoRestartTest { connectorAttributes); final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); - - if (PipeStatus.RUNNING.equals(initialStatus)) { - pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); - } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java index 4b57cd64e43..9855552113c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java @@ -19,7 +19,9 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; @@ -97,6 +99,64 @@ public class PipeStaticMeta { .toLowerCase()); } + public boolean mayNeedCompatibleRootUserForIoTDBSource() { + final String pluginName = + sourceParameters + .getStringOrDefault( + Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, PipeSourceConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); + + return PipeType.USER.equals(getPipeType()) + && (pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) + && !sourceParameters.hasAnyAttributes( + PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, + PipeSourceConstant.SOURCE_IOTDB_USER_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, + PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + } + + public boolean mayNeedCompatibleRootUserForWriteBackSink() { + final String pluginName = + sinkParameters + .getStringOrDefault( + Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), + BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName()) + .toLowerCase(); + + return PipeType.USER.equals(getPipeType()) + && (pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) + && !sinkParameters.hasAnyAttributes( + PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY, + PipeSinkConstant.SINK_IOTDB_USER_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY, + PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, + PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); + } + + public void enrichSourceWithRootUserForCompatibility( + final String rootUserName, final String password) { + sourceParameters + .getAttribute() + .put(PipeSourceConstant.SOURCE_IOTDB_USER_ID, String.valueOf(IoTDBConstant.SUPER_USER_ID)); + sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, rootUserName); + sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, password); + } + + public void enrichWriteBackSinkWithRootUserForCompatibility( + final String rootUserName, final String password) { + sinkParameters + .getAttribute() + .put(PipeSinkConstant.SINK_IOTDB_USER_ID, String.valueOf(IoTDBConstant.SUPER_USER_ID)); + sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, rootUserName); + sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY, password); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
