This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
The following commit(s) were added to refs/heads/main by this push: new 1e61a22 Add several functionalities to the BanyanDBClient (#49) 1e61a22 is described below commit 1e61a2217f225a280bc7b28d5abd1f970210da0b Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Sep 24 23:40:33 2023 +0800 Add several functionalities to the BanyanDBClient (#49) --- CHANGES.md | 3 ++ README.md | 12 +++++- .../banyandb/v1/client/BanyanDBClient.java | 38 ++++++++--------- .../skywalking/banyandb/v1/client/Options.java | 4 ++ .../v1/client/grpc/channel/ChannelManager.java | 31 ++++++-------- .../client/grpc/channel/DefaultChannelFactory.java | 48 ++++++++++++++++++++-- .../v1/client/AbstractBanyanDBClientTest.java | 2 +- .../banyandb/v1/client/BanyanDBClientTestCI.java | 2 +- 8 files changed, 96 insertions(+), 44 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index dbe9ec8..f17ab8f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,9 @@ Release Notes. * Add mod revision check to write requests * Add TTL to property. +* Support setting multiple server addresses +* Support DNS name resolution +* Support round-robin load balancing 0.4.0 ------------------ diff --git a/README.md b/README.md index ab78a18..955a4f3 100644 --- a/README.md +++ b/README.md @@ -14,15 +14,22 @@ The client implement for SkyWalking BanyanDB in Java. ## Create a client -Create a `BanyanDBClient` with host, port and then use `connect()` to establish a connection. +Create a `BanyanDBClient` with the server's several addresses and then use `connect()` to establish a connection. ```java // use `default` group -client = new BanyanDBClient("127.0.0.1", 17912); +BanyanDBClient client = new BanyanDBClient("banyandb.svc:17912", "10.0.12.9:17912"); // to send any request, a connection to the server must be estabilished client.connect(); ``` +These addresses are either IP addresses or DNS names. If DNS names are used, the client will resolve the DNS name to +IP addresses and use them to connect to the server. The client will periodically refresh the IP addresses of the DNS +name. The refresh interval can be configured by `resolveDNSInterval` option. + +The client will try to connect to the server in a round-robin manner. The client will periodically refresh the server +addresses. The refresh interval can be configured by `refreshInterval` option. + Besides, you may pass a customized options while building a `BanyanDBClient`. Supported options are listed below, @@ -32,6 +39,7 @@ options are listed below, | maxInboundMessageSize | Max inbound message size | 1024 * 1024 * 50 (~50MB) | | deadline | Threshold of gRPC blocking query, unit is second | 30 (seconds) | | refreshInterval | Refresh interval for the gRPC channel, unit is second | 30 (seconds) | +| resolveDNSInterval | DNS resolve interval, unit is second | 30 (minutes) | | forceReconnectionThreshold | Threshold of force gRPC reconnection if network issue is encountered | 1 | | forceTLS | Force use TLS for gRPC | false | | sslTrustCAPath | SSL: Trusted CA Path | | diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java index 69a04f9..c1d4e3f 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java @@ -28,7 +28,6 @@ import io.grpc.stub.StreamObserver; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure; import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc; @@ -59,7 +58,9 @@ import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregationMetadata import java.io.Closeable; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -83,14 +84,8 @@ import static com.google.common.base.Preconditions.checkState; */ @Slf4j public class BanyanDBClient implements Closeable { - /** - * The hostname of BanyanDB server. - */ - private final String host; - /** - * The port of BanyanDB server. - */ - private final int port; + + private final String[] targets; /** * Options for server connection. */ @@ -138,23 +133,24 @@ public class BanyanDBClient implements Closeable { /** * Create a BanyanDB client instance with a default options. * - * @param host IP or domain name - * @param port Server port + * @param targets server targets */ - public BanyanDBClient(String host, int port) { - this(host, port, new Options()); + public BanyanDBClient(String... targets) { + this(targets, new Options()); } /** * Create a BanyanDB client instance with a customized options. * - * @param host IP or domain name - * @param port Server port + * @param targets server targets * @param options customized options */ - public BanyanDBClient(String host, int port, Options options) { - this.host = host; - this.port = port; + public BanyanDBClient(String[] targets, Options options) { + String[] tt = Preconditions.checkNotNull(targets, "targets"); + checkState(tt.length > 0, "targets' size must be more than 1"); + tt = Arrays.stream(tt).filter(t -> !Strings.isNullOrEmpty(t)).toArray(size -> new String[size]); + checkState(tt.length > 0, "valid targets' size must be more than 1"); + this.targets = tt; this.options = options; this.connectionEstablishLock = new ReentrantLock(); this.metadataCache = new MetadataCache(); @@ -169,8 +165,12 @@ public class BanyanDBClient implements Closeable { connectionEstablishLock.lock(); try { if (!isConnected) { + URI[] addresses = new URI[this.targets.length]; + for (int i = 0; i < this.targets.length; i++) { + addresses[i] = URI.create("//" + this.targets[i]); + } this.channel = ChannelManager.create(this.options.buildChannelManagerSettings(), - new DefaultChannelFactory(this.host, this.port, this.options)); + new DefaultChannelFactory(addresses, this.options)); streamServiceBlockingStub = StreamServiceGrpc.newBlockingStub(this.channel); measureServiceBlockingStub = MeasureServiceGrpc.newBlockingStub(this.channel); streamServiceStub = StreamServiceGrpc.newStub(this.channel); diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java index 109fd19..512ef2f 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java @@ -45,6 +45,10 @@ public class Options { * Threshold of force gRPC reconnection if network issue is encountered */ private long forceReconnectionThreshold = 1; + /** + * Threshold of resolving the DNS + */ + private long resolveDNSInterval = 30 * 60; /** * Force use TLS for gRPC * Default is false diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java index 5a01994..dd780a1 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java @@ -29,9 +29,7 @@ import io.grpc.ForwardingClientCallListener; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -import io.grpc.NameResolverRegistry; import io.grpc.Status; -import io.grpc.internal.DnsNameResolverProvider; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -57,22 +55,18 @@ public class ChannelManager extends ManagedChannel { private final ScheduledExecutorService executor; @VisibleForTesting final AtomicReference<Entry> entryRef = new AtomicReference<>(); - private final String authority; public static ChannelManager create(ChannelManagerSettings settings, ChannelFactory channelFactory) throws IOException { return new ChannelManager(settings, channelFactory, Executors.newSingleThreadScheduledExecutor()); } - public ChannelManager(ChannelManagerSettings settings, ChannelFactory channelFactory, ScheduledExecutorService executor) throws IOException { + ChannelManager(ChannelManagerSettings settings, ChannelFactory channelFactory, ScheduledExecutorService executor) throws IOException { this.settings = settings; this.channelFactory = channelFactory; this.executor = executor; - NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider()); - entryRef.set(new Entry(channelFactory.create())); - authority = entryRef.get().channel.authority(); this.executor.scheduleAtFixedRate( this::refreshSafely, @@ -92,16 +86,17 @@ public class ChannelManager extends ManagedChannel { void refresh() throws IOException { Entry entry = entryRef.get(); - if (entry.needReconnect) { - if (entry.isConnected(entry.reconnectCount.incrementAndGet() > this.settings.forceReconnectionThreshold())) { - // Reconnect to the same server is automatically done by GRPC - // clear the flags - entry.reset(); - } else { - Entry replacedEntry = entryRef.getAndSet(new Entry(this.channelFactory.create())); - replacedEntry.shutdown(); - } + if (!entry.needReconnect) { + return; + } + if (entry.isConnected(entry.reconnectCount.incrementAndGet() > this.settings.forceReconnectionThreshold())) { + // Reconnect to the same server is automatically done by GRPC + // clear the flags + entry.reset(); + return; } + Entry replacedEntry = entryRef.getAndSet(new Entry(this.channelFactory.create())); + replacedEntry.shutdown(); } @Override @@ -157,7 +152,7 @@ public class ChannelManager extends ManagedChannel { @Override public String authority() { - return this.authority; + return this.entryRef.get().channel.authority(); } @RequiredArgsConstructor @@ -192,7 +187,7 @@ public class ChannelManager extends ManagedChannel { @Override public String authority() { - return authority; + return ChannelManager.this.authority(); } } diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java index d60392c..d246dda 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java @@ -24,6 +24,8 @@ import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.SocketUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.v1.client.Options; @@ -33,17 +35,30 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.stream.Stream; @Slf4j @RequiredArgsConstructor public class DefaultChannelFactory implements ChannelFactory { - private final String host; - private final int port; + private final URI[] targets; private final Options options; + private SocketAddress[] addresses; + private long lastTargetsResolvedTime; @Override public ManagedChannel create() throws IOException { - NettyChannelBuilder managedChannelBuilder = NettyChannelBuilder.forAddress(this.host, this.port) + if (this.addresses == null || + System.currentTimeMillis() - this.lastTargetsResolvedTime > this.options.getResolveDNSInterval()) { + resolveTargets(); + } + NettyChannelBuilder managedChannelBuilder = NettyChannelBuilder.forAddress(resolveAddress()) .maxInboundMessageSize(options.getMaxInboundMessageSize()) .usePlaintext(); @@ -75,4 +90,31 @@ public class DefaultChannelFactory implements ChannelFactory { } return managedChannelBuilder.build(); } + + private void resolveTargets() { + this.addresses = Arrays.stream(this.targets) + .flatMap(target -> { + try { + return Arrays.stream(SocketUtils.allAddressesByName(target.getHost())) + .map(InetAddress::getHostAddress) + .map(ip -> new InetSocketAddress(ip, target.getPort())); + } catch (Throwable t) { + log.error("Failed to resolve the BanyanDB server's address ", t); + } + return Stream.empty(); + }) + .sorted(Comparator.comparing(InetSocketAddress::toString)) + .distinct() + .toArray(InetSocketAddress[]::new); + this.lastTargetsResolvedTime = System.currentTimeMillis(); + } + + private SocketAddress resolveAddress() throws UnknownHostException { + int numAddresses = this.addresses.length; + if (numAddresses < 1) { + throw new UnknownHostException(); + } + int offset = numAddresses == 1 ? 0 : PlatformDependent.threadLocalRandom().nextInt(numAddresses); + return this.addresses[offset]; + } } diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java index 328c0dc..0d9d2e4 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java @@ -354,7 +354,7 @@ public class AbstractBanyanDBClientTest { this.channel = grpcCleanup.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); - client = new BanyanDBClient("127.0.0.1", s.getPort()); + client = new BanyanDBClient(String.format("127.0.0.1:%d", s.getPort())); client.connect(this.channel); } diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java index e34c6c1..a6359f1 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java @@ -49,7 +49,7 @@ public class BanyanDBClientTestCI { protected void setUpConnection() throws IOException { log.info("create BanyanDB client and try to connect"); - client = new BanyanDBClient(banyanDB.getHost(), banyanDB.getMappedPort(GRPC_PORT)); + client = new BanyanDBClient(String.format("%s:%d", banyanDB.getHost(), banyanDB.getMappedPort(GRPC_PORT))); client.connect(); }