This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-rename-fallback
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-rename-fallback by this
push:
new 2d90b8e3367 Pipe: Make pipe-api compatible with the v1.2.x releases
2d90b8e3367 is described below
commit 2d90b8e33675ccd3ae53b09f97bfe66fca0411c8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Nov 2 18:32:30 2023 +0800
Pipe: Make pipe-api compatible with the v1.2.x releases
---
.../parameter/PipeParameterValidator.java | 2 +-
.../api/customizer/parameter/PipeParameters.java | 41 ++++++++++++++++++----
.../persistence/pipe/PipePluginInfo.java | 2 +-
.../db/pipe/agent/plugin/PipePluginAgent.java | 2 +-
.../db/pipe/connector/protocol/IoTDBConnector.java | 11 +++---
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 11 +++---
.../pipe/extractor/IoTDBDataRegionExtractor.java | 4 +--
.../PipeHistoricalDataRegionTsFileExtractor.java | 5 +--
8 files changed, 55 insertions(+), 23 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index a70151a97fe..7d55b44f37a 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -61,7 +61,7 @@ public class PipeParameterValidator {
return this;
}
- final String actualValue = parameters.getString(key);
+ final String actualValue = parameters.getStringByKeys(key);
for (String optionalValue : optionalValues) {
if (actualValue.equals(optionalValue)) {
return this;
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 2d00ce717b8..c896825f342 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -61,7 +61,36 @@ public class PipeParameters {
return false;
}
- public String getString(String... keys) {
+ public String getString(String key) {
+ return attributes.get(key);
+ }
+
+ public Boolean getBoolean(String key) {
+ String value = attributes.get(key);
+ return value == null ? null : Boolean.parseBoolean(value);
+ }
+
+ public Integer getInt(String key) {
+ String value = attributes.get(key);
+ return value == null ? null : Integer.parseInt(value);
+ }
+
+ public Long getLong(String key) {
+ String value = attributes.get(key);
+ return value == null ? null : Long.parseLong(value);
+ }
+
+ public Float getFloat(String key) {
+ String value = attributes.get(key);
+ return value == null ? null : Float.parseFloat(value);
+ }
+
+ public Double getDouble(String key) {
+ String value = attributes.get(key);
+ return value == null ? null : Double.parseDouble(value);
+ }
+
+ public String getStringByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
if (value != null) {
@@ -71,7 +100,7 @@ public class PipeParameters {
return null;
}
- public Boolean getBoolean(String... keys) {
+ public Boolean getBooleanByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
if (value != null) {
@@ -81,7 +110,7 @@ public class PipeParameters {
return null;
}
- public Integer getInt(String... keys) {
+ public Integer getIntByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
if (value != null) {
@@ -91,7 +120,7 @@ public class PipeParameters {
return null;
}
- public Long getLong(String... keys) {
+ public Long getLongByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
if (value != null) {
@@ -101,7 +130,7 @@ public class PipeParameters {
return null;
}
- public Float getFloat(String... keys) {
+ public Float getFloatByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
if (value != null) {
@@ -111,7 +140,7 @@ public class PipeParameters {
return null;
}
- public Double getDouble(String... keys) {
+ public Double getDoubleByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
if (value != null) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 85ab9f69adc..e4cde84ae6b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -161,7 +161,7 @@ public class PipePluginInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
final String connectorPluginName =
- connectorParameters.getString(
+ connectorParameters.getStringByKeys(
PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY);
if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
final String exceptionMessage =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index a7edab29de8..f3445c7a107 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -226,7 +226,7 @@ public class PipePluginAgent {
}
return (PipeConnector)
reflect(
- connectorParameters.getString(
+ connectorParameters.getStringByKeys(
PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 5f295b550ff..11612db8996 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -99,27 +99,28 @@ public abstract class IoTDBConnector implements
PipeConnector {
&& parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
givenNodeUrls.add(
new TEndPoint(
- parameters.getString(CONNECTOR_IOTDB_IP_KEY),
- parameters.getInt(CONNECTOR_IOTDB_PORT_KEY)));
+ parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
+ parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
}
if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
&& parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
givenNodeUrls.add(
new TEndPoint(
- parameters.getString(SINK_IOTDB_IP_KEY),
parameters.getInt(SINK_IOTDB_PORT_KEY)));
+ parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
+ parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
}
if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
givenNodeUrls.addAll(
SessionUtils.parseSeedNodeUrls(
-
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
+
Arrays.asList(parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
}
if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
givenNodeUrls.addAll(
SessionUtils.parseSeedNodeUrls(
-
Arrays.asList(parameters.getString(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
}
return givenNodeUrls;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index fabf87f4bdf..6fd50881e17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -140,15 +140,16 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
&& parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
givenNodeUrls.add(
new TEndPoint(
- parameters.getString(CONNECTOR_IOTDB_IP_KEY),
- parameters.getInt(CONNECTOR_IOTDB_PORT_KEY)));
+ parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
+ parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
}
if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
&& parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
givenNodeUrls.add(
new TEndPoint(
- parameters.getString(SINK_IOTDB_IP_KEY),
parameters.getInt(SINK_IOTDB_PORT_KEY)));
+ parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
+ parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
}
return givenNodeUrls;
@@ -157,8 +158,8 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
@Override
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
- ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY,
SINK_IOTDB_IP_KEY);
- port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY, SINK_IOTDB_PORT_KEY);
+ ipAddress = parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY,
SINK_IOTDB_IP_KEY);
+ port = parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY,
SINK_IOTDB_PORT_KEY);
user =
parameters.getStringOrDefault(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index d113b4b984f..15f6696118d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -154,7 +154,7 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
return;
}
- switch (parameters.getString(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
+ switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
@@ -172,7 +172,7 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
"Unsupported extractor realtime mode: {}, create a hybrid
extractor.",
- parameters.getString(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY));
+ parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY));
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 009a9f510c9..b1d5037078a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -132,7 +132,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getString(
+ parameters.getStringByKeys(
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY),
ZoneId.systemDefault())
: Long.MIN_VALUE;
@@ -141,7 +141,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getString(EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY),
+ parameters.getStringByKeys(
+ EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY),
ZoneId.systemDefault())
: Long.MAX_VALUE;