This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9fe80a48e2c Fix pipe consensus compatibility during rolling upgrade
(#17428)
9fe80a48e2c is described below
commit 9fe80a48e2cab8419ef62e2df71d9403eeef2542
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 3 10:48:36 2026 +0800
Fix pipe consensus compatibility during rolling upgrade (#17428)
* Fix pipe consensus compatibility during rolling upgrade
* Apply spotless formatting
* Fix pipe consensus upgrade compatibility
---
.../confignode/conf/SystemPropertiesUtils.java | 35 ++++++++++++++++++----
.../apache/iotdb/consensus/ConsensusFactory.java | 19 +++++++++++-
.../org/apache/iotdb/db/conf/IoTDBStartCheck.java | 24 +++++++++++++--
.../PipeDataRegionProcessorConstructor.java | 4 +++
.../dataregion/PipeDataRegionSinkConstructor.java | 8 +++++
.../agent/plugin/PipeDataNodePluginAgentTest.java | 30 +++++++++++++++++++
.../agent/plugin/builtin/BuiltinPipePlugin.java | 11 ++++++-
7 files changed, 122 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 529f15d06cd..a3ee036f6ab 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.file.SystemPropertiesHandler;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,10 +125,15 @@ public class SystemPropertiesUtils {
}
}
+ // Only the data region protocol could have been persisted as the old
PipeConsensus name
+ // during a jar-only upgrade, so only that field needs compatibility
normalization.
// Consensus protocol configuration
+ boolean needRewriteConsensusProtocol = false;
+
String configNodeConsensusProtocolClass =
systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null);
- if
(!configNodeConsensusProtocolClass.equals(conf.getConfigNodeConsensusProtocolClass()))
{
+ if (!Objects.equals(
+ configNodeConsensusProtocolClass,
conf.getConfigNodeConsensusProtocolClass())) {
LOGGER.warn(
format,
CN_CONSENSUS_PROTOCOL,
@@ -136,9 +142,22 @@ public class SystemPropertiesUtils {
conf.setConfigNodeConsensusProtocolClass(configNodeConsensusProtocolClass);
}
- String dataRegionConsensusProtocolClass =
+ String persistedDataRegionConsensusProtocolClass =
systemProperties.getProperty(DATA_CONSENSUS_PROTOCOL, null);
- if
(!dataRegionConsensusProtocolClass.equals(conf.getDataRegionConsensusProtocolClass()))
{
+ String dataRegionConsensusProtocolClass =
+
ConsensusFactory.normalizeConsensusProtocolClass(persistedDataRegionConsensusProtocolClass);
+ if (!Objects.equals(
+ persistedDataRegionConsensusProtocolClass,
dataRegionConsensusProtocolClass)) {
+ systemProperties.setProperty(DATA_CONSENSUS_PROTOCOL,
dataRegionConsensusProtocolClass);
+ needRewriteConsensusProtocol = true;
+ LOGGER.warn(
+ "[SystemProperties] Normalize {} from {} to {} for compatibility.",
+ DATA_CONSENSUS_PROTOCOL,
+ persistedDataRegionConsensusProtocolClass,
+ dataRegionConsensusProtocolClass);
+ }
+ if (!Objects.equals(
+ dataRegionConsensusProtocolClass,
conf.getDataRegionConsensusProtocolClass())) {
LOGGER.warn(
format,
DATA_CONSENSUS_PROTOCOL,
@@ -149,7 +168,8 @@ public class SystemPropertiesUtils {
String schemaRegionConsensusProtocolClass =
systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null);
- if
(!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass()))
{
+ if (!Objects.equals(
+ schemaRegionConsensusProtocolClass,
conf.getSchemaRegionConsensusProtocolClass())) {
LOGGER.warn(
format,
SCHEMA_CONSENSUS_PROTOCOL,
@@ -157,6 +177,9 @@ public class SystemPropertiesUtils {
schemaRegionConsensusProtocolClass);
conf.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
}
+ if (needRewriteConsensusProtocol) {
+ systemPropertiesHandler.overwrite(systemProperties);
+ }
// PartitionSlot configuration
if (systemProperties.getProperty(SERIES_PARTITION_SLOT_NUM, null) != null)
{
@@ -265,7 +288,9 @@ public class SystemPropertiesUtils {
// Consensus protocol configuration
systemProperties.setProperty(CN_CONSENSUS_PROTOCOL,
conf.getConfigNodeConsensusProtocolClass());
systemProperties.setProperty(
- DATA_CONSENSUS_PROTOCOL, conf.getDataRegionConsensusProtocolClass());
+ DATA_CONSENSUS_PROTOCOL,
+ ConsensusFactory.normalizeConsensusProtocolClass(
+ conf.getDataRegionConsensusProtocolClass()));
systemProperties.setProperty(
SCHEMA_CONSENSUS_PROTOCOL,
conf.getSchemaRegionConsensusProtocolClass());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 896a877e3e7..d24955aba73 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -37,6 +37,10 @@ public class ConsensusFactory {
public static final String SIMPLE_CONSENSUS =
"org.apache.iotdb.consensus.simple.SimpleConsensus";
public static final String RATIS_CONSENSUS =
"org.apache.iotdb.consensus.ratis.RatisConsensus";
public static final String IOT_CONSENSUS =
"org.apache.iotdb.consensus.iot.IoTConsensus";
+ // Keep the pre-rename class name for stale system properties / snapshots
restored after a
+ // jar-only upgrade.
+ public static final String LEGACY_IOT_CONSENSUS_V2 =
+ "org.apache.iotdb.consensus.pipe.PipeConsensus";
public static final String REAL_IOT_CONSENSUS_V2 =
"org.apache.iotdb.consensus.pipe.IoTConsensusV2";
public static final String IOT_CONSENSUS_V2 =
"org.apache.iotdb.consensus.iot.IoTConsensusV2";
@@ -49,11 +53,24 @@ public class ConsensusFactory {
throw new IllegalStateException("Utility class ConsensusFactory");
}
+ // Downstream code compares against IOT_CONSENSUS_V2 directly, so persisted
legacy names must be
+ // normalized to the canonical constant before they fan out.
+ public static String normalizeConsensusProtocolClass(String className) {
+ if (className == null) {
+ return null;
+ }
+ if (LEGACY_IOT_CONSENSUS_V2.equals(className) ||
REAL_IOT_CONSENSUS_V2.equals(className)) {
+ return IOT_CONSENSUS_V2;
+ }
+ return className;
+ }
+
public static Optional<IConsensus> getConsensusImpl(
String className, ConsensusConfig config, IStateMachine.Registry
registry) {
try {
+ className = normalizeConsensusProtocolClass(className);
// special judge for IoTConsensusV2
- if (className.equals(IOT_CONSENSUS_V2)) {
+ if (IOT_CONSENSUS_V2.equals(className)) {
className = REAL_IOT_CONSENSUS_V2;
// initialize iotConsensusV2's thrift component
IoTV2GlobalComponentContainer.build();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index e2aa1950af6..d85444a56d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -282,13 +282,33 @@ public class IoTDBStartCheck {
if (properties.containsKey(CLUSTER_ID)) {
config.setClusterId(properties.getProperty(CLUSTER_ID));
}
+ // Only the data region protocol could have been persisted as the old
PipeConsensus name
+ // during a jar-only upgrade, so only that field needs compatibility
normalization.
+ boolean needRewriteConsensusProtocol = false;
if (properties.containsKey(SCHEMA_REGION_CONSENSUS_PROTOCOL)) {
config.setSchemaRegionConsensusProtocolClass(
properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL));
}
if (properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL)) {
- config.setDataRegionConsensusProtocolClass(
- properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL));
+ final String persistedDataRegionConsensusProtocolClass =
+ properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL);
+ final String dataRegionConsensusProtocolClass =
+ ConsensusFactory.normalizeConsensusProtocolClass(
+ persistedDataRegionConsensusProtocolClass);
+ if (!Objects.equals(
+ persistedDataRegionConsensusProtocolClass,
dataRegionConsensusProtocolClass)) {
+ properties.setProperty(DATA_REGION_CONSENSUS_PROTOCOL,
dataRegionConsensusProtocolClass);
+ needRewriteConsensusProtocol = true;
+ logger.warn(
+ "[SystemProperties] Normalize {} from {} to {} for compatibility.",
+ DATA_REGION_CONSENSUS_PROTOCOL,
+ persistedDataRegionConsensusProtocolClass,
+ dataRegionConsensusProtocolClass);
+ }
+
config.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass);
+ }
+ if (needRewriteConsensusProtocol) {
+ systemPropertiesHandler.overwrite(properties);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 5bd2ef17509..a6160e6b900 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -69,6 +69,10 @@ class PipeDataRegionProcessorConstructor extends
PipeProcessorConstructor {
pluginConstructors.put(
BuiltinPipePlugin.IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName(),
IoTConsensusV2Processor::new);
+ // Keep the pre-rename plugin name wired to the new implementation for
stale PipeMeta.
+ pluginConstructors.put(
+ BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
+ IoTConsensusV2Processor::new);
pluginConstructors.put(
BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(),
RenameDatabaseProcessor::new);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
index cbd77ebc902..c8e87890afd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
@@ -56,6 +56,10 @@ class PipeDataRegionSinkConstructor extends
PipeSinkConstructor {
pluginConstructors.put(
BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName(),
IoTConsensusV2AsyncSink::new);
+ // Keep the pre-rename plugin name wired to the new implementation for
stale PipeMeta.
+ pluginConstructors.put(
+ BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName(),
+ IoTConsensusV2AsyncSink::new);
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
IoTDBLegacyPipeSink::new);
@@ -97,5 +101,9 @@ class PipeDataRegionSinkConstructor extends
PipeSinkConstructor {
pluginConstructors.put(
BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName(),
IoTConsensusV2AsyncSink::new);
+ // Keep the pre-rename plugin name wired to the new implementation for
stale PipeMeta.
+ pluginConstructors.put(
+ BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(),
+ IoTConsensusV2AsyncSink::new);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index 5d4d6a640b6..b0f0b34e92a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableMa
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
+import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -118,6 +120,20 @@ public class PipeDataNodePluginAgentTest {
}
}))
.getClass());
+ Assert.assertEquals(
+ IoTConsensusV2Processor.class,
+ agent
+ .dataRegion()
+ .reflectProcessor(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(
+ PipeProcessorConstant.PROCESSOR_KEY,
+
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName());
+ }
+ }))
+ .getClass());
Assert.assertEquals(
IoTDBDataRegionAsyncSink.class,
agent
@@ -132,5 +148,19 @@ public class PipeDataNodePluginAgentTest {
}
}))
.getClass());
+ Assert.assertEquals(
+ IoTConsensusV2AsyncSink.class,
+ agent
+ .dataRegion()
+ .reflectSink(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(
+ PipeSinkConstant.CONNECTOR_KEY,
+
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName());
+ }
+ }))
+ .getClass());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index e95ddb61e1a..76b45d1e798 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -74,6 +74,8 @@ public enum BuiltinPipePlugin {
STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor",
StandardStatisticsProcessor.class),
TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor",
TumblingWindowingProcessor.class),
IOT_CONSENSUS_V2_PROCESSOR("iot-consensus-v2-processor",
IoTConsensusV2Processor.class),
+ // Legacy alias for stale PipeMeta written before the PipeConsensus ->
IoTConsensusV2 rename.
+ PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor",
IoTConsensusV2Processor.class),
RENAME_DATABASE_PROCESSOR("rename-database-processor",
RenameDatabaseProcessor.class),
// connectors
@@ -86,6 +88,8 @@ public enum BuiltinPipePlugin {
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class),
IOT_CONSENSUS_V2_ASYNC_CONNECTOR(
"iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class),
+ // Legacy alias for stale PipeMeta written before the PipeConsensus ->
IoTConsensusV2 rename.
+ PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector",
IoTConsensusV2AsyncSink.class),
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class),
@@ -105,6 +109,8 @@ public enum BuiltinPipePlugin {
WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
IOT_CONSENSUS_V2_ASYNC_SINK("iot-consensus-v2-async-sink",
IoTConsensusV2AsyncSink.class),
+ // Legacy alias for stale PipeMeta written before the PipeConsensus ->
IoTConsensusV2 rename.
+ PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink",
IoTConsensusV2AsyncSink.class),
;
private final String pipePluginName;
@@ -158,6 +164,7 @@ public enum BuiltinPipePlugin {
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName().toUpperCase(),
+ PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
// Connectors
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
@@ -172,6 +179,7 @@ public enum BuiltinPipePlugin {
OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
+
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
// Sinks
IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
@@ -180,5 +188,6 @@ public enum BuiltinPipePlugin {
OPC_UA_SINK.getPipePluginName().toUpperCase(),
OPC_DA_SINK.getPipePluginName().toUpperCase(),
SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
-
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase())));
+
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase(),
+
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
}