This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
The following commit(s) were added to refs/heads/master by this push:
new d859bb6 support nodeurls to multi ip (#84)
d859bb6 is described below
commit d859bb6d3de39f23f6a11a6a9a7e0d22b92ed9b5
Author: ChaoWang <[email protected]>
AuthorDate: Sat Jun 28 10:13:16 2025 +0800
support nodeurls to multi ip (#84)
support nodeurls to multi ip (#84)
---
.../src/main/java/org/apache/iotdb/flink/IoTDBSink.java | 11 +++++++++--
.../org/apache/iotdb/flink/options/IoTDBOptions.java | 16 ++++++++++++++++
.../org/apache/iotdb/flink/options/IoTDBSinkOptions.java | 9 +++++++++
.../org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java | 11 +++++++++++
4 files changed, 45 insertions(+), 2 deletions(-)
diff --git
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index 5986e80..74b3ad0 100644
---
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -77,8 +77,15 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
}
void initSession() {
- pool =
- new SessionPool(
+ if (options.getNodeUrls() != null) {
+ pool = new SessionPool(
+ options.getNodeUrls(),
+ options.getUser(),
+ options.getPassword(),
+ sessionPoolSize);
+ return;
+ }
+ pool = new SessionPool(
options.getHost(),
options.getPort(),
options.getUser(),
diff --git
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
index f62d749..57c81f6 100644
---
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
+++
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.flink.options;
import java.io.Serializable;
+import java.util.List;
public class IoTDBOptions implements Serializable {
@@ -26,6 +27,7 @@ public class IoTDBOptions implements Serializable {
protected int port;
protected String user;
protected String password;
+ protected List<String> nodeUrls;
public IoTDBOptions(String host, int port, String user, String password) {
this.host = host;
@@ -34,6 +36,12 @@ public class IoTDBOptions implements Serializable {
this.password = password;
}
+ public IoTDBOptions(List<String> nodeUrls, String user, String password) {
+ this.nodeUrls = nodeUrls;
+ this.user = user;
+ this.password = password;
+ }
+
public IoTDBOptions() {}
public String getHost() {
@@ -67,4 +75,12 @@ public class IoTDBOptions implements Serializable {
public void setPassword(String password) {
this.password = password;
}
+
+ public List<String> getNodeUrls() {
+ return nodeUrls;
+ }
+
+ public void setNodeUrls(List<String> nodeUrls) {
+ this.nodeUrls = nodeUrls;
+ }
}
diff --git
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
index c2a6917..347eee9 100644
---
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
+++
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
@@ -42,6 +42,15 @@ public class IoTDBSinkOptions extends IoTDBOptions {
this.timeseriesOptionList = timeseriesOptionList;
}
+ public IoTDBSinkOptions(
+ List<String> nodeUrls,
+ String user,
+ String password,
+ List<TimeseriesOption> timeseriesOptionList) {
+ super(nodeUrls, user, password);
+ this.timeseriesOptionList = timeseriesOptionList;
+ }
+
public List<TimeseriesOption> getTimeseriesOptionList() {
return timeseriesOptionList;
}
diff --git
a/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
b/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index bd592d2..0b3f1de 100644
---
a/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++
b/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -25,10 +25,12 @@ import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -113,4 +115,13 @@ public class IoTDBSinkBatchInsertTest {
any(List.class), any(List.class), any(List.class),
any(List.class), any(List.class));
verify(pool).close();
}
+
+ @Test
+ public void testInitUrlNodes() {
+ List<String> nodeUrls = new ArrayList<>();
+ nodeUrls.add("127.0.0.1:6667");
+ nodeUrls.add("127.0.0.2:6667");
+ IoTDBSinkOptions sinkOptions = new IoTDBSinkOptions(nodeUrls, null, null,
null);
+ assertEquals(sinkOptions.getNodeUrls(), nodeUrls);
+ }
}