This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe80a48e2c Fix pipe consensus compatibility during rolling upgrade 
(#17428)
9fe80a48e2c is described below

commit 9fe80a48e2cab8419ef62e2df71d9403eeef2542
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 3 10:48:36 2026 +0800

    Fix pipe consensus compatibility during rolling upgrade (#17428)
    
    * Fix pipe consensus compatibility during rolling upgrade
    
    * Apply spotless formatting
    
    * Fix pipe consensus upgrade compatibility
---
 .../confignode/conf/SystemPropertiesUtils.java     | 35 ++++++++++++++++++----
 .../apache/iotdb/consensus/ConsensusFactory.java   | 19 +++++++++++-
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  | 24 +++++++++++++--
 .../PipeDataRegionProcessorConstructor.java        |  4 +++
 .../dataregion/PipeDataRegionSinkConstructor.java  |  8 +++++
 .../agent/plugin/PipeDataNodePluginAgentTest.java  | 30 +++++++++++++++++++
 .../agent/plugin/builtin/BuiltinPipePlugin.java    | 11 ++++++-
 7 files changed, 122 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 529f15d06cd..a3ee036f6ab 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.file.SystemPropertiesHandler;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.consensus.ConsensusFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,10 +125,15 @@ public class SystemPropertiesUtils {
       }
     }
 
+    // Only the data region protocol could have been persisted as the old 
PipeConsensus name
+    // during a jar-only upgrade, so only that field needs compatibility 
normalization.
     // Consensus protocol configuration
+    boolean needRewriteConsensusProtocol = false;
+
     String configNodeConsensusProtocolClass =
         systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null);
-    if 
(!configNodeConsensusProtocolClass.equals(conf.getConfigNodeConsensusProtocolClass()))
 {
+    if (!Objects.equals(
+        configNodeConsensusProtocolClass, 
conf.getConfigNodeConsensusProtocolClass())) {
       LOGGER.warn(
           format,
           CN_CONSENSUS_PROTOCOL,
@@ -136,9 +142,22 @@ public class SystemPropertiesUtils {
       
conf.setConfigNodeConsensusProtocolClass(configNodeConsensusProtocolClass);
     }
 
-    String dataRegionConsensusProtocolClass =
+    String persistedDataRegionConsensusProtocolClass =
         systemProperties.getProperty(DATA_CONSENSUS_PROTOCOL, null);
-    if 
(!dataRegionConsensusProtocolClass.equals(conf.getDataRegionConsensusProtocolClass()))
 {
+    String dataRegionConsensusProtocolClass =
+        
ConsensusFactory.normalizeConsensusProtocolClass(persistedDataRegionConsensusProtocolClass);
+    if (!Objects.equals(
+        persistedDataRegionConsensusProtocolClass, 
dataRegionConsensusProtocolClass)) {
+      systemProperties.setProperty(DATA_CONSENSUS_PROTOCOL, 
dataRegionConsensusProtocolClass);
+      needRewriteConsensusProtocol = true;
+      LOGGER.warn(
+          "[SystemProperties] Normalize {} from {} to {} for compatibility.",
+          DATA_CONSENSUS_PROTOCOL,
+          persistedDataRegionConsensusProtocolClass,
+          dataRegionConsensusProtocolClass);
+    }
+    if (!Objects.equals(
+        dataRegionConsensusProtocolClass, 
conf.getDataRegionConsensusProtocolClass())) {
       LOGGER.warn(
           format,
           DATA_CONSENSUS_PROTOCOL,
@@ -149,7 +168,8 @@ public class SystemPropertiesUtils {
 
     String schemaRegionConsensusProtocolClass =
         systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null);
-    if 
(!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass()))
 {
+    if (!Objects.equals(
+        schemaRegionConsensusProtocolClass, 
conf.getSchemaRegionConsensusProtocolClass())) {
       LOGGER.warn(
           format,
           SCHEMA_CONSENSUS_PROTOCOL,
@@ -157,6 +177,9 @@ public class SystemPropertiesUtils {
           schemaRegionConsensusProtocolClass);
       
conf.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
     }
+    if (needRewriteConsensusProtocol) {
+      systemPropertiesHandler.overwrite(systemProperties);
+    }
 
     // PartitionSlot configuration
     if (systemProperties.getProperty(SERIES_PARTITION_SLOT_NUM, null) != null) 
{
@@ -265,7 +288,9 @@ public class SystemPropertiesUtils {
     // Consensus protocol configuration
     systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, 
conf.getConfigNodeConsensusProtocolClass());
     systemProperties.setProperty(
-        DATA_CONSENSUS_PROTOCOL, conf.getDataRegionConsensusProtocolClass());
+        DATA_CONSENSUS_PROTOCOL,
+        ConsensusFactory.normalizeConsensusProtocolClass(
+            conf.getDataRegionConsensusProtocolClass()));
     systemProperties.setProperty(
         SCHEMA_CONSENSUS_PROTOCOL, 
conf.getSchemaRegionConsensusProtocolClass());
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 896a877e3e7..d24955aba73 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -37,6 +37,10 @@ public class ConsensusFactory {
   public static final String SIMPLE_CONSENSUS = 
"org.apache.iotdb.consensus.simple.SimpleConsensus";
   public static final String RATIS_CONSENSUS = 
"org.apache.iotdb.consensus.ratis.RatisConsensus";
   public static final String IOT_CONSENSUS = 
"org.apache.iotdb.consensus.iot.IoTConsensus";
+  // Keep the pre-rename class name for stale system properties / snapshots 
restored after a
+  // jar-only upgrade.
+  public static final String LEGACY_IOT_CONSENSUS_V2 =
+      "org.apache.iotdb.consensus.pipe.PipeConsensus";
   public static final String REAL_IOT_CONSENSUS_V2 =
       "org.apache.iotdb.consensus.pipe.IoTConsensusV2";
   public static final String IOT_CONSENSUS_V2 = 
"org.apache.iotdb.consensus.iot.IoTConsensusV2";
@@ -49,11 +53,24 @@ public class ConsensusFactory {
     throw new IllegalStateException("Utility class ConsensusFactory");
   }
 
+  // Downstream code compares against IOT_CONSENSUS_V2 directly, so persisted 
legacy names must be
+  // normalized to the canonical constant before they fan out.
+  public static String normalizeConsensusProtocolClass(String className) {
+    if (className == null) {
+      return null;
+    }
+    if (LEGACY_IOT_CONSENSUS_V2.equals(className) || 
REAL_IOT_CONSENSUS_V2.equals(className)) {
+      return IOT_CONSENSUS_V2;
+    }
+    return className;
+  }
+
   public static Optional<IConsensus> getConsensusImpl(
       String className, ConsensusConfig config, IStateMachine.Registry 
registry) {
     try {
+      className = normalizeConsensusProtocolClass(className);
       // special judge for IoTConsensusV2
-      if (className.equals(IOT_CONSENSUS_V2)) {
+      if (IOT_CONSENSUS_V2.equals(className)) {
         className = REAL_IOT_CONSENSUS_V2;
         // initialize iotConsensusV2's thrift component
         IoTV2GlobalComponentContainer.build();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index e2aa1950af6..d85444a56d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -282,13 +282,33 @@ public class IoTDBStartCheck {
     if (properties.containsKey(CLUSTER_ID)) {
       config.setClusterId(properties.getProperty(CLUSTER_ID));
     }
+    // Only the data region protocol could have been persisted as the old 
PipeConsensus name
+    // during a jar-only upgrade, so only that field needs compatibility 
normalization.
+    boolean needRewriteConsensusProtocol = false;
     if (properties.containsKey(SCHEMA_REGION_CONSENSUS_PROTOCOL)) {
       config.setSchemaRegionConsensusProtocolClass(
           properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL));
     }
     if (properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL)) {
-      config.setDataRegionConsensusProtocolClass(
-          properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL));
+      final String persistedDataRegionConsensusProtocolClass =
+          properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL);
+      final String dataRegionConsensusProtocolClass =
+          ConsensusFactory.normalizeConsensusProtocolClass(
+              persistedDataRegionConsensusProtocolClass);
+      if (!Objects.equals(
+          persistedDataRegionConsensusProtocolClass, 
dataRegionConsensusProtocolClass)) {
+        properties.setProperty(DATA_REGION_CONSENSUS_PROTOCOL, 
dataRegionConsensusProtocolClass);
+        needRewriteConsensusProtocol = true;
+        logger.warn(
+            "[SystemProperties] Normalize {} from {} to {} for compatibility.",
+            DATA_REGION_CONSENSUS_PROTOCOL,
+            persistedDataRegionConsensusProtocolClass,
+            dataRegionConsensusProtocolClass);
+      }
+      
config.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass);
+    }
+    if (needRewriteConsensusProtocol) {
+      systemPropertiesHandler.overwrite(properties);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 5bd2ef17509..a6160e6b900 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -69,6 +69,10 @@ class PipeDataRegionProcessorConstructor extends 
PipeProcessorConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName(),
         IoTConsensusV2Processor::new);
+    // Keep the pre-rename plugin name wired to the new implementation for 
stale PipeMeta.
+    pluginConstructors.put(
+        BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
+        IoTConsensusV2Processor::new);
     pluginConstructors.put(
         BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(),
         RenameDatabaseProcessor::new);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
index cbd77ebc902..c8e87890afd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
@@ -56,6 +56,10 @@ class PipeDataRegionSinkConstructor extends 
PipeSinkConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName(),
         IoTConsensusV2AsyncSink::new);
+    // Keep the pre-rename plugin name wired to the new implementation for 
stale PipeMeta.
+    pluginConstructors.put(
+        BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName(),
+        IoTConsensusV2AsyncSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
         IoTDBLegacyPipeSink::new);
@@ -97,5 +101,9 @@ class PipeDataRegionSinkConstructor extends 
PipeSinkConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName(),
         IoTConsensusV2AsyncSink::new);
+    // Keep the pre-rename plugin name wired to the new implementation for 
stale PipeMeta.
+    pluginConstructors.put(
+        BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(),
+        IoTConsensusV2AsyncSink::new);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index 5d4d6a640b6..b0f0b34e92a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableMa
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import 
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
+import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -118,6 +120,20 @@ public class PipeDataNodePluginAgentTest {
                       }
                     }))
             .getClass());
+    Assert.assertEquals(
+        IoTConsensusV2Processor.class,
+        agent
+            .dataRegion()
+            .reflectProcessor(
+                new PipeParameters(
+                    new HashMap<String, String>() {
+                      {
+                        put(
+                            PipeProcessorConstant.PROCESSOR_KEY,
+                            
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName());
+                      }
+                    }))
+            .getClass());
     Assert.assertEquals(
         IoTDBDataRegionAsyncSink.class,
         agent
@@ -132,5 +148,19 @@ public class PipeDataNodePluginAgentTest {
                       }
                     }))
             .getClass());
+    Assert.assertEquals(
+        IoTConsensusV2AsyncSink.class,
+        agent
+            .dataRegion()
+            .reflectSink(
+                new PipeParameters(
+                    new HashMap<String, String>() {
+                      {
+                        put(
+                            PipeSinkConstant.CONNECTOR_KEY,
+                            
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName());
+                      }
+                    }))
+            .getClass());
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index e95ddb61e1a..76b45d1e798 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -74,6 +74,8 @@ public enum BuiltinPipePlugin {
   STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", 
StandardStatisticsProcessor.class),
   TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", 
TumblingWindowingProcessor.class),
   IOT_CONSENSUS_V2_PROCESSOR("iot-consensus-v2-processor", 
IoTConsensusV2Processor.class),
+  // Legacy alias for stale PipeMeta written before the PipeConsensus -> 
IoTConsensusV2 rename.
+  PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", 
IoTConsensusV2Processor.class),
   RENAME_DATABASE_PROCESSOR("rename-database-processor", 
RenameDatabaseProcessor.class),
 
   // connectors
@@ -86,6 +88,8 @@ public enum BuiltinPipePlugin {
   IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class),
   IOT_CONSENSUS_V2_ASYNC_CONNECTOR(
       "iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class),
+  // Legacy alias for stale PipeMeta written before the PipeConsensus -> 
IoTConsensusV2 rename.
+  PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", 
IoTConsensusV2AsyncSink.class),
 
   WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
   OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class),
@@ -105,6 +109,8 @@ public enum BuiltinPipePlugin {
   WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
   SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
   IOT_CONSENSUS_V2_ASYNC_SINK("iot-consensus-v2-async-sink", 
IoTConsensusV2AsyncSink.class),
+  // Legacy alias for stale PipeMeta written before the PipeConsensus -> 
IoTConsensusV2 rename.
+  PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", 
IoTConsensusV2AsyncSink.class),
   ;
 
   private final String pipePluginName;
@@ -158,6 +164,7 @@ public enum BuiltinPipePlugin {
                   
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
                   
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
                   IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName().toUpperCase(),
+                  PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
                   RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
                   // Connectors
                   DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
@@ -172,6 +179,7 @@ public enum BuiltinPipePlugin {
                   OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
                   WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
+                  
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   // Sinks
                   IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
@@ -180,5 +188,6 @@ public enum BuiltinPipePlugin {
                   OPC_UA_SINK.getPipePluginName().toUpperCase(),
                   OPC_DA_SINK.getPipePluginName().toUpperCase(),
                   SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
-                  
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase())));
+                  
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase(),
+                  
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
 }

Reply via email to