This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e61a22  Add several functionalities to the BanyanDBClient (#49)
1e61a22 is described below

commit 1e61a2217f225a280bc7b28d5abd1f970210da0b
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Sep 24 23:40:33 2023 +0800

    Add several functionalities to the BanyanDBClient (#49)
---
 CHANGES.md                                         |  3 ++
 README.md                                          | 12 +++++-
 .../banyandb/v1/client/BanyanDBClient.java         | 38 ++++++++---------
 .../skywalking/banyandb/v1/client/Options.java     |  4 ++
 .../v1/client/grpc/channel/ChannelManager.java     | 31 ++++++--------
 .../client/grpc/channel/DefaultChannelFactory.java | 48 ++++++++++++++++++++--
 .../v1/client/AbstractBanyanDBClientTest.java      |  2 +-
 .../banyandb/v1/client/BanyanDBClientTestCI.java   |  2 +-
 8 files changed, 96 insertions(+), 44 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index dbe9ec8..f17ab8f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,9 @@ Release Notes.
 
 * Add mod revision check to write requests
 * Add TTL to property.
+* Support setting multiple server addresses 
+* Support DNS name resolution
+* Support round-robin load balancing
 
 0.4.0
 ------------------
diff --git a/README.md b/README.md
index ab78a18..955a4f3 100644
--- a/README.md
+++ b/README.md
@@ -14,15 +14,22 @@ The client implement for SkyWalking BanyanDB in Java.
 
 ## Create a client
 
-Create a `BanyanDBClient` with host, port and then use `connect()` to 
establish a connection.
+Create a `BanyanDBClient` with the server's several addresses and then use 
`connect()` to establish a connection.
 
 ```java
 // use `default` group
-client = new BanyanDBClient("127.0.0.1", 17912);
+BanyanDBClient client = new BanyanDBClient("banyandb.svc:17912", 
"10.0.12.9:17912");
 // to send any request, a connection to the server must be estabilished
 client.connect();
 ```
 
+These addresses are either IP addresses or DNS names. If DNS names are used, 
the client will resolve the DNS name to
+IP addresses and use them to connect to the server. The client will 
periodically refresh the IP addresses of the DNS
+name. The refresh interval can be configured by `resolveDNSInterval` option.
+
+The client will try to connect to the server in a round-robin manner. The 
client will periodically refresh the server
+addresses. The refresh interval can be configured by `refreshInterval` option.
+
 Besides, you may pass a customized options while building a `BanyanDBClient`. 
Supported
 options are listed below,
 
@@ -32,6 +39,7 @@ options are listed below,
 | maxInboundMessageSize      | Max inbound message size                        
                     | 1024 * 1024 * 50 (~50MB) |
 | deadline                   | Threshold of gRPC blocking query, unit is 
second                     | 30 (seconds)             |
 | refreshInterval            | Refresh interval for the gRPC channel, unit is 
second                | 30 (seconds)             |
+| resolveDNSInterval         | DNS resolve interval, unit is second            
                     | 30 (minutes)             |
 | forceReconnectionThreshold | Threshold of force gRPC reconnection if network 
issue is encountered | 1                        |
 | forceTLS                   | Force use TLS for gRPC                          
                     | false                    |
 | sslTrustCAPath             | SSL: Trusted CA Path                            
                     |                          |
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 69a04f9..c1d4e3f 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -28,7 +28,6 @@ import io.grpc.stub.StreamObserver;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
 import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
 import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
@@ -59,7 +58,9 @@ import 
org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregationMetadata
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -83,14 +84,8 @@ import static 
com.google.common.base.Preconditions.checkState;
  */
 @Slf4j
 public class BanyanDBClient implements Closeable {
-    /**
-     * The hostname of BanyanDB server.
-     */
-    private final String host;
-    /**
-     * The port of BanyanDB server.
-     */
-    private final int port;
+
+    private final String[] targets;
     /**
      * Options for server connection.
      */
@@ -138,23 +133,24 @@ public class BanyanDBClient implements Closeable {
     /**
      * Create a BanyanDB client instance with a default options.
      *
-     * @param host IP or domain name
-     * @param port Server port
+     * @param targets server targets
      */
-    public BanyanDBClient(String host, int port) {
-        this(host, port, new Options());
+    public BanyanDBClient(String... targets) {
+        this(targets, new Options());
     }
 
     /**
      * Create a BanyanDB client instance with a customized options.
      *
-     * @param host    IP or domain name
-     * @param port    Server port
+     * @param targets server targets
      * @param options customized options
      */
-    public BanyanDBClient(String host, int port, Options options) {
-        this.host = host;
-        this.port = port;
+    public BanyanDBClient(String[] targets, Options options) {
+        String[] tt = Preconditions.checkNotNull(targets, "targets");
+        checkState(tt.length > 0, "targets' size must be more than 1");
+        tt = Arrays.stream(tt).filter(t -> 
!Strings.isNullOrEmpty(t)).toArray(size -> new String[size]);
+        checkState(tt.length > 0, "valid targets' size must be more than 1");
+        this.targets = tt;
         this.options = options;
         this.connectionEstablishLock = new ReentrantLock();
         this.metadataCache = new MetadataCache();
@@ -169,8 +165,12 @@ public class BanyanDBClient implements Closeable {
         connectionEstablishLock.lock();
         try {
             if (!isConnected) {
+                URI[] addresses = new URI[this.targets.length];
+                for (int i = 0; i < this.targets.length; i++) {
+                        addresses[i] = URI.create("//" + this.targets[i]);
+                }
                 this.channel = 
ChannelManager.create(this.options.buildChannelManagerSettings(),
-                        new DefaultChannelFactory(this.host, this.port, 
this.options));
+                        new DefaultChannelFactory(addresses, this.options));
                 streamServiceBlockingStub = 
StreamServiceGrpc.newBlockingStub(this.channel);
                 measureServiceBlockingStub = 
MeasureServiceGrpc.newBlockingStub(this.channel);
                 streamServiceStub = StreamServiceGrpc.newStub(this.channel);
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
index 109fd19..512ef2f 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -45,6 +45,10 @@ public class Options {
      * Threshold of force gRPC reconnection if network issue is encountered
      */
     private long forceReconnectionThreshold = 1;
+    /**
+     * Threshold of resolving the DNS
+     */
+    private long resolveDNSInterval = 30 * 60;
     /**
      * Force use TLS for gRPC
      * Default is false
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java
index 5a01994..dd780a1 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java
@@ -29,9 +29,7 @@ import io.grpc.ForwardingClientCallListener;
 import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
-import io.grpc.NameResolverRegistry;
 import io.grpc.Status;
-import io.grpc.internal.DnsNameResolverProvider;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
@@ -57,22 +55,18 @@ public class ChannelManager extends ManagedChannel {
     private final ScheduledExecutorService executor;
     @VisibleForTesting
     final AtomicReference<Entry> entryRef = new AtomicReference<>();
-    private final String authority;
 
     public static ChannelManager create(ChannelManagerSettings settings, 
ChannelFactory channelFactory)
             throws IOException {
         return new ChannelManager(settings, channelFactory, 
Executors.newSingleThreadScheduledExecutor());
     }
 
-    public ChannelManager(ChannelManagerSettings settings, ChannelFactory 
channelFactory, ScheduledExecutorService executor) throws IOException {
+    ChannelManager(ChannelManagerSettings settings, ChannelFactory 
channelFactory, ScheduledExecutorService executor) throws IOException {
         this.settings = settings;
         this.channelFactory = channelFactory;
         this.executor = executor;
 
-        NameResolverRegistry.getDefaultRegistry().register(new 
DnsNameResolverProvider());
-
         entryRef.set(new Entry(channelFactory.create()));
-        authority = entryRef.get().channel.authority();
 
         this.executor.scheduleAtFixedRate(
                 this::refreshSafely,
@@ -92,16 +86,17 @@ public class ChannelManager extends ManagedChannel {
 
     void refresh() throws IOException {
         Entry entry = entryRef.get();
-        if (entry.needReconnect) {
-            if (entry.isConnected(entry.reconnectCount.incrementAndGet() > 
this.settings.forceReconnectionThreshold())) {
-                // Reconnect to the same server is automatically done by GRPC
-                // clear the flags
-                entry.reset();
-            } else {
-                Entry replacedEntry = entryRef.getAndSet(new 
Entry(this.channelFactory.create()));
-                replacedEntry.shutdown();
-            }
+        if (!entry.needReconnect) {
+            return;
+        }
+        if (entry.isConnected(entry.reconnectCount.incrementAndGet() > 
this.settings.forceReconnectionThreshold())) {
+            // Reconnect to the same server is automatically done by GRPC
+            // clear the flags
+            entry.reset();
+            return;
         }
+        Entry replacedEntry = entryRef.getAndSet(new 
Entry(this.channelFactory.create()));
+        replacedEntry.shutdown();
     }
 
     @Override
@@ -157,7 +152,7 @@ public class ChannelManager extends ManagedChannel {
 
     @Override
     public String authority() {
-        return this.authority;
+        return this.entryRef.get().channel.authority();
     }
 
     @RequiredArgsConstructor
@@ -192,7 +187,7 @@ public class ChannelManager extends ManagedChannel {
 
         @Override
         public String authority() {
-            return authority;
+            return ChannelManager.this.authority();
         }
     }
 
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java
index d60392c..d246dda 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java
@@ -24,6 +24,8 @@ import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NegotiationType;
 import io.grpc.netty.NettyChannelBuilder;
 import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.SocketUtils;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.v1.client.Options;
@@ -33,17 +35,30 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.stream.Stream;
 
 @Slf4j
 @RequiredArgsConstructor
 public class DefaultChannelFactory implements ChannelFactory {
-    private final String host;
-    private final int port;
+    private final URI[] targets;
     private final Options options;
+    private SocketAddress[] addresses;
+    private long lastTargetsResolvedTime;
 
     @Override
     public ManagedChannel create() throws IOException {
-        NettyChannelBuilder managedChannelBuilder = 
NettyChannelBuilder.forAddress(this.host, this.port)
+        if (this.addresses == null ||
+                System.currentTimeMillis() - this.lastTargetsResolvedTime > 
this.options.getResolveDNSInterval()) {
+            resolveTargets();
+        }
+        NettyChannelBuilder managedChannelBuilder = 
NettyChannelBuilder.forAddress(resolveAddress())
                 .maxInboundMessageSize(options.getMaxInboundMessageSize())
                 .usePlaintext();
 
@@ -75,4 +90,31 @@ public class DefaultChannelFactory implements ChannelFactory 
{
         }
         return managedChannelBuilder.build();
     }
+
+    private void resolveTargets() {
+        this.addresses = Arrays.stream(this.targets)
+                .flatMap(target -> {
+                    try {
+                        return 
Arrays.stream(SocketUtils.allAddressesByName(target.getHost()))
+                                .map(InetAddress::getHostAddress)
+                                .map(ip -> new InetSocketAddress(ip, 
target.getPort()));
+                    } catch (Throwable t) {
+                        log.error("Failed to resolve the BanyanDB server's 
address ", t);
+                    }
+                    return Stream.empty();
+                })
+                .sorted(Comparator.comparing(InetSocketAddress::toString))
+                .distinct()
+                .toArray(InetSocketAddress[]::new);
+        this.lastTargetsResolvedTime = System.currentTimeMillis();
+    }
+
+    private SocketAddress resolveAddress() throws UnknownHostException {
+        int numAddresses = this.addresses.length;
+        if (numAddresses < 1) {
+            throw new UnknownHostException();
+        }
+        int offset = numAddresses == 1 ? 0 : 
PlatformDependent.threadLocalRandom().nextInt(numAddresses);
+        return this.addresses[offset];
+    }
 }
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
index 328c0dc..0d9d2e4 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
@@ -354,7 +354,7 @@ public class AbstractBanyanDBClientTest {
         this.channel = grpcCleanup.register(
                 
InProcessChannelBuilder.forName(serverName).directExecutor().build());
 
-        client = new BanyanDBClient("127.0.0.1", s.getPort());
+        client = new BanyanDBClient(String.format("127.0.0.1:%d", 
s.getPort()));
         client.connect(this.channel);
     }
 
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java
index e34c6c1..a6359f1 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java
@@ -49,7 +49,7 @@ public class BanyanDBClientTestCI {
 
     protected void setUpConnection() throws IOException {
         log.info("create BanyanDB client and try to connect");
-        client = new BanyanDBClient(banyanDB.getHost(), 
banyanDB.getMappedPort(GRPC_PORT));
+        client = new BanyanDBClient(String.format("%s:%d", banyanDB.getHost(), 
banyanDB.getMappedPort(GRPC_PORT)));
         client.connect();
     }
 

Reply via email to