This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-rename-fallback
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-rename-fallback by this 
push:
     new 2d90b8e3367 Pipe: Make pipe-api compatible with the v1.2.x releases
2d90b8e3367 is described below

commit 2d90b8e33675ccd3ae53b09f97bfe66fca0411c8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Nov 2 18:32:30 2023 +0800

    Pipe: Make pipe-api compatible with the v1.2.x releases
---
 .../parameter/PipeParameterValidator.java          |  2 +-
 .../api/customizer/parameter/PipeParameters.java   | 41 ++++++++++++++++++----
 .../persistence/pipe/PipePluginInfo.java           |  2 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  2 +-
 .../db/pipe/connector/protocol/IoTDBConnector.java | 11 +++---
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  | 11 +++---
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  4 +--
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  5 +--
 8 files changed, 55 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index a70151a97fe..7d55b44f37a 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -61,7 +61,7 @@ public class PipeParameterValidator {
       return this;
     }
 
-    final String actualValue = parameters.getString(key);
+    final String actualValue = parameters.getStringByKeys(key);
     for (String optionalValue : optionalValues) {
       if (actualValue.equals(optionalValue)) {
         return this;
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 2d00ce717b8..c896825f342 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -61,7 +61,36 @@ public class PipeParameters {
     return false;
   }
 
-  public String getString(String... keys) {
+  public String getString(String key) {
+    return attributes.get(key);
+  }
+
+  public Boolean getBoolean(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Boolean.parseBoolean(value);
+  }
+
+  public Integer getInt(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Integer.parseInt(value);
+  }
+
+  public Long getLong(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Long.parseLong(value);
+  }
+
+  public Float getFloat(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Float.parseFloat(value);
+  }
+
+  public Double getDouble(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Double.parseDouble(value);
+  }
+
+  public String getStringByKeys(String... keys) {
     for (final String key : keys) {
       final String value = attributes.get(key);
       if (value != null) {
@@ -71,7 +100,7 @@ public class PipeParameters {
     return null;
   }
 
-  public Boolean getBoolean(String... keys) {
+  public Boolean getBooleanByKeys(String... keys) {
     for (final String key : keys) {
       final String value = attributes.get(key);
       if (value != null) {
@@ -81,7 +110,7 @@ public class PipeParameters {
     return null;
   }
 
-  public Integer getInt(String... keys) {
+  public Integer getIntByKeys(String... keys) {
     for (final String key : keys) {
       final String value = attributes.get(key);
       if (value != null) {
@@ -91,7 +120,7 @@ public class PipeParameters {
     return null;
   }
 
-  public Long getLong(String... keys) {
+  public Long getLongByKeys(String... keys) {
     for (final String key : keys) {
       final String value = attributes.get(key);
       if (value != null) {
@@ -101,7 +130,7 @@ public class PipeParameters {
     return null;
   }
 
-  public Float getFloat(String... keys) {
+  public Float getFloatByKeys(String... keys) {
     for (final String key : keys) {
       final String value = attributes.get(key);
       if (value != null) {
@@ -111,7 +140,7 @@ public class PipeParameters {
     return null;
   }
 
-  public Double getDouble(String... keys) {
+  public Double getDoubleByKeys(String... keys) {
     for (final String key : keys) {
       final String value = attributes.get(key);
       if (value != null) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 85ab9f69adc..e4cde84ae6b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -161,7 +161,7 @@ public class PipePluginInfo implements SnapshotProcessor {
       throw new PipeException(exceptionMessage);
     }
     final String connectorPluginName =
-        connectorParameters.getString(
+        connectorParameters.getStringByKeys(
             PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY);
     if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
       final String exceptionMessage =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index a7edab29de8..f3445c7a107 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -226,7 +226,7 @@ public class PipePluginAgent {
     }
     return (PipeConnector)
         reflect(
-            connectorParameters.getString(
+            connectorParameters.getStringByKeys(
                 PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY));
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 5f295b550ff..11612db8996 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -99,27 +99,28 @@ public abstract class IoTDBConnector implements 
PipeConnector {
         && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
       givenNodeUrls.add(
           new TEndPoint(
-              parameters.getString(CONNECTOR_IOTDB_IP_KEY),
-              parameters.getInt(CONNECTOR_IOTDB_PORT_KEY)));
+              parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
+              parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
     }
 
     if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
         && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
       givenNodeUrls.add(
           new TEndPoint(
-              parameters.getString(SINK_IOTDB_IP_KEY), 
parameters.getInt(SINK_IOTDB_PORT_KEY)));
+              parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
+              parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
     }
 
     if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
       givenNodeUrls.addAll(
           SessionUtils.parseSeedNodeUrls(
-              
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
+              
Arrays.asList(parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
     }
 
     if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
       givenNodeUrls.addAll(
           SessionUtils.parseSeedNodeUrls(
-              
Arrays.asList(parameters.getString(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+              
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
     }
 
     return givenNodeUrls;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index fabf87f4bdf..6fd50881e17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -140,15 +140,16 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
         && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
       givenNodeUrls.add(
           new TEndPoint(
-              parameters.getString(CONNECTOR_IOTDB_IP_KEY),
-              parameters.getInt(CONNECTOR_IOTDB_PORT_KEY)));
+              parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
+              parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
     }
 
     if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
         && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
       givenNodeUrls.add(
           new TEndPoint(
-              parameters.getString(SINK_IOTDB_IP_KEY), 
parameters.getInt(SINK_IOTDB_PORT_KEY)));
+              parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
+              parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
     }
 
     return givenNodeUrls;
@@ -157,8 +158,8 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
-    ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY, 
SINK_IOTDB_IP_KEY);
-    port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY, SINK_IOTDB_PORT_KEY);
+    ipAddress = parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY, 
SINK_IOTDB_IP_KEY);
+    port = parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY, 
SINK_IOTDB_PORT_KEY);
 
     user =
         parameters.getStringOrDefault(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index d113b4b984f..15f6696118d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -154,7 +154,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
       return;
     }
 
-    switch (parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
+    switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
       case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
       case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
         realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
@@ -172,7 +172,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
               "Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
-              parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
+              parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
         }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 009a9f510c9..b1d5037078a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -132,7 +132,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                 && parameters.hasAnyAttributes(
                     EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(
+                parameters.getStringByKeys(
                     EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY),
                 ZoneId.systemDefault())
             : Long.MIN_VALUE;
@@ -141,7 +141,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                 && parameters.hasAnyAttributes(
                     EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY),
+                parameters.getStringByKeys(
+                    EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY),
                 ZoneId.systemDefault())
             : Long.MAX_VALUE;
 

Reply via email to