This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git


The following commit(s) were added to refs/heads/master by this push:
     new d859bb6  support nodeurls to multi ip (#84)
d859bb6 is described below

commit d859bb6d3de39f23f6a11a6a9a7e0d22b92ed9b5
Author: ChaoWang <[email protected]>
AuthorDate: Sat Jun 28 10:13:16 2025 +0800

    support nodeurls to multi ip (#84)
    
    support nodeurls to multi ip (#84)
---
 .../src/main/java/org/apache/iotdb/flink/IoTDBSink.java  | 11 +++++++++--
 .../org/apache/iotdb/flink/options/IoTDBOptions.java     | 16 ++++++++++++++++
 .../org/apache/iotdb/flink/options/IoTDBSinkOptions.java |  9 +++++++++
 .../org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java | 11 +++++++++++
 4 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
 
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index 5986e80..74b3ad0 100644
--- 
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ 
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -77,8 +77,15 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
   }
 
   void initSession() {
-    pool =
-        new SessionPool(
+    if (options.getNodeUrls() != null) {
+      pool = new SessionPool(
+              options.getNodeUrls(),
+              options.getUser(),
+              options.getPassword(),
+              sessionPoolSize);
+      return;
+    }
+    pool = new SessionPool(
             options.getHost(),
             options.getPort(),
             options.getUser(),
diff --git 
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
 
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
index f62d749..57c81f6 100644
--- 
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
+++ 
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.flink.options;
 
 import java.io.Serializable;
+import java.util.List;
 
 public class IoTDBOptions implements Serializable {
 
@@ -26,6 +27,7 @@ public class IoTDBOptions implements Serializable {
   protected int port;
   protected String user;
   protected String password;
+  protected List<String> nodeUrls;
 
   public IoTDBOptions(String host, int port, String user, String password) {
     this.host = host;
@@ -34,6 +36,12 @@ public class IoTDBOptions implements Serializable {
     this.password = password;
   }
 
+  public IoTDBOptions(List<String> nodeUrls, String user, String password) {
+    this.nodeUrls = nodeUrls;
+    this.user = user;
+    this.password = password;
+  }
+
   public IoTDBOptions() {}
 
   public String getHost() {
@@ -67,4 +75,12 @@ public class IoTDBOptions implements Serializable {
   public void setPassword(String password) {
     this.password = password;
   }
+
+  public List<String> getNodeUrls() {
+    return nodeUrls;
+  }
+
+  public void setNodeUrls(List<String> nodeUrls) {
+    this.nodeUrls = nodeUrls;
+  }
 }
diff --git 
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
 
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
index c2a6917..347eee9 100644
--- 
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
+++ 
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
@@ -42,6 +42,15 @@ public class IoTDBSinkOptions extends IoTDBOptions {
     this.timeseriesOptionList = timeseriesOptionList;
   }
 
+  public IoTDBSinkOptions(
+          List<String> nodeUrls,
+          String user,
+          String password,
+          List<TimeseriesOption> timeseriesOptionList) {
+    super(nodeUrls, user, password);
+    this.timeseriesOptionList = timeseriesOptionList;
+  }
+
   public List<TimeseriesOption> getTimeseriesOptionList() {
     return timeseriesOptionList;
   }
diff --git 
a/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
 
b/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index bd592d2..0b3f1de 100644
--- 
a/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++ 
b/connectors/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -25,10 +25,12 @@ import com.google.common.collect.Lists;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -113,4 +115,13 @@ public class IoTDBSinkBatchInsertTest {
             any(List.class), any(List.class), any(List.class), 
any(List.class), any(List.class));
     verify(pool).close();
   }
+
+  @Test
+  public void testInitUrlNodes() {
+    List<String> nodeUrls = new ArrayList<>();
+    nodeUrls.add("127.0.0.1:6667");
+    nodeUrls.add("127.0.0.2:6667");
+    IoTDBSinkOptions sinkOptions = new IoTDBSinkOptions(nodeUrls, null, null, 
null);
+    assertEquals(sinkOptions.getNodeUrls(), nodeUrls);
+  }
 }

Reply via email to