This is an automated email from the ASF dual-hosted git repository.
anovikov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3225d39cb3c IGNITE-26893 Java thin: improve DNS resolution logic
(#7031)
3225d39cb3c is described below
commit 3225d39cb3c30e21c2872398488af1401cf9e314
Author: Andrey Novikov <[email protected]>
AuthorDate: Tue Dec 9 15:36:54 2025 +0700
IGNITE-26893 Java thin: improve DNS resolution logic (#7031)
Co-authored-by: Pavel Tupitsyn <[email protected]>
---
.idea/checkstyle-idea.xml | 2 +-
.../org/apache/ignite/client/IgniteClient.java | 28 ++-
.../ignite/client/IgniteClientConfiguration.java | 15 ++
.../client/IgniteClientConfigurationImpl.java | 86 ++++++-
.../internal/client/InetAddressResolver.java | 55 +++++
.../ignite/internal/client/ReliableChannel.java | 144 +++++++++--
.../ignite/internal/client/TcpIgniteClient.java | 13 +-
.../ignite/client/ClientDnsDiscoveryTest.java | 272 +++++++++++++++++++++
.../org/apache/ignite/client/ConnectionTest.java | 16 +-
.../ignite/client/FeatureCompatibilityTest.java | 37 ++-
.../ObservableTimestampComputePropagationTest.java | 7 +-
.../client/ObservableTimestampPropagationTest.java | 6 +-
.../org/apache/ignite/client/ReconnectTest.java | 6 +-
.../org/apache/ignite/client/RetryPolicyTest.java | 3 +-
.../java/org/apache/ignite/client/TestServer.java | 102 +++++++-
.../org/apache/ignite/jdbc/IgniteJdbcDriver.java | 3 +-
.../client/ItThinClientChannelValidatorTest.java | 3 +-
.../app/client/ItThinClientConnectionTest.java | 18 +-
18 files changed, 750 insertions(+), 66 deletions(-)
diff --git a/.idea/checkstyle-idea.xml b/.idea/checkstyle-idea.xml
index 3cc177b4de9..3d0cf2aa55c 100644
--- a/.idea/checkstyle-idea.xml
+++ b/.idea/checkstyle-idea.xml
@@ -24,4 +24,4 @@
</list>
</option>
</component>
-</project>
+</project>
\ No newline at end of file
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 099833a59f1..3454b9ded25 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_BACKGROUND_RECONNECT_INTERVAL;
+import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_TIMEOUT;
@@ -115,6 +116,8 @@ public interface IgniteClient extends Ignite, AutoCloseable
{
private @Nullable String name;
+ long backgroundReResolveAddressesInterval =
DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL;
+
/**
* Sets the addresses of Ignite server nodes within a cluster. An
address can be an IP address or a hostname, with or without port.
* If port is not set then Ignite will use the default one - see
{@link IgniteClientConfiguration#DFLT_PORT}.
@@ -406,6 +409,28 @@ public interface IgniteClient extends Ignite,
AutoCloseable {
return this;
}
+ /**
+ * Sets how long the resolved addresses will be considered valid, in
milliseconds. Set to {@code 0} for infinite validity.
+ *
+ * <p>Ignite client resolve the provided hostnames into multiple IP
addresses, each corresponds to an active cluster node.
+ * However, additional IP addresses can be collected after updating
the DNS records. This property controls how often Ignite
+ * client will try to re-resolve provided hostnames and connect to
newly discovered addresses.
+ *
+ * @param backgroundReResolveAddressesInterval Background re-resolve
interval, in milliseconds.
+ * @return This instance.
+ * @throws IllegalArgumentException When value is less than zero.
+ */
+ public Builder backgroundReResolveAddressesInterval(long
backgroundReResolveAddressesInterval) {
+ if (backgroundReResolveAddressesInterval < 0) {
+ throw new
IllegalArgumentException("backgroundReResolveAddressesInterval ["
+ + backgroundReResolveAddressesInterval + "] must be a
non-negative integer value.");
+ }
+
+ this.backgroundReResolveAddressesInterval =
backgroundReResolveAddressesInterval;
+
+ return this;
+ }
+
/**
* Builds the client.
*
@@ -436,7 +461,8 @@ public interface IgniteClient extends Ignite, AutoCloseable
{
authenticator,
operationTimeout,
sqlPartitionAwarenessMetadataCacheSize,
- name
+ name,
+ backgroundReResolveAddressesInterval
);
return TcpIgniteClient.startAsync(cfg);
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index ef708867caa..e11f0ddd0f4 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -43,6 +43,9 @@ public interface IgniteClientConfiguration {
/** Default background reconnect interval, in milliseconds. */
long DFLT_BACKGROUND_RECONNECT_INTERVAL = 30_000L;
+ /** Default interval sets how long the resolved addresses will be
considered valid, in milliseconds. */
+ long DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL = 30_000L;
+
/** Default operation timeout, in milliseconds. */
int DFLT_OPERATION_TIMEOUT = 0;
@@ -222,4 +225,16 @@ public interface IgniteClientConfiguration {
* @return Client name.
*/
@Nullable String name();
+
+ /**
+ * Gets how long the resolved addresses will be considered valid, in
milliseconds. Set to {@code 0} for infinite validity.
+ * Default is {@link #DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL}.
+ *
+ * <p>Ignite client resolve the provided hostnames into multiple IP
addresses, each corresponds to an active cluster node.
+ * However, additional IP addresses can be collected after updating the
DNS records. This property controls how often Ignite
+ * client will try to re-resolve provided hostnames and connect to newly
discovered addresses.
+ *
+ * @return Background re-resolve interval, in milliseconds.
+ */
+ long backgroundReResolveAddressesInterval();
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index 2be085ee1ea..55df2ee726a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -25,6 +25,7 @@ import org.apache.ignite.client.RetryPolicy;
import org.apache.ignite.client.SslConfiguration;
import org.apache.ignite.lang.LoggerFactory;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* Immutable client configuration.
@@ -68,6 +69,10 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
private final @Nullable String name;
+ private final InetAddressResolver addressResolver;
+
+ private final long backgroundReResolveAddressesInterval;
+
/**
* Constructor.
*
@@ -86,7 +91,10 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
* @param operationTimeout Operation timeout.
* @param sqlPartitionAwarenessMetadataCacheSize Size of the cache to
store partition awareness metadata.
* @param name Client name.
+ * @param backgroundReResolveAddressesInterval Background re-resolve
addresses interval.
+ * @param addressResolver Address resolver.
*/
+ @VisibleForTesting
public IgniteClientConfigurationImpl(
@Nullable IgniteClientAddressFinder addressFinder,
String[] addresses,
@@ -102,7 +110,9 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
@Nullable IgniteClientAuthenticator authenticator,
long operationTimeout,
int sqlPartitionAwarenessMetadataCacheSize,
- @Nullable String name
+ @Nullable String name,
+ long backgroundReResolveAddressesInterval,
+ @Nullable InetAddressResolver addressResolver
) {
this.addressFinder = addressFinder;
@@ -122,6 +132,66 @@ public final class IgniteClientConfigurationImpl
implements IgniteClientConfigur
this.operationTimeout = operationTimeout;
this.sqlPartitionAwarenessMetadataCacheSize =
sqlPartitionAwarenessMetadataCacheSize;
this.name = name;
+ this.backgroundReResolveAddressesInterval =
backgroundReResolveAddressesInterval;
+ this.addressResolver = addressResolver;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param addressFinder Address finder.
+ * @param addresses Addresses.
+ * @param connectTimeout Socket connect timeout.
+ * @param backgroundReconnectInterval Background reconnect interval.
+ * @param asyncContinuationExecutor Async continuation executor.
+ * @param heartbeatInterval Heartbeat message interval.
+ * @param heartbeatTimeout Heartbeat message timeout.
+ * @param retryPolicy Retry policy.
+ * @param loggerFactory Logger factory which will be used to create a
logger instance for this this particular client when
+ * needed.
+ * @param metricsEnabled Whether metrics are enabled.
+ * @param authenticator Authenticator.
+ * @param operationTimeout Operation timeout.
+ * @param sqlPartitionAwarenessMetadataCacheSize Size of the cache to
store partition awareness metadata.
+ * @param name Client name.
+ */
+ public IgniteClientConfigurationImpl(
+ @Nullable IgniteClientAddressFinder addressFinder,
+ String[] addresses,
+ long connectTimeout,
+ long backgroundReconnectInterval,
+ @Nullable Executor asyncContinuationExecutor,
+ long heartbeatInterval,
+ long heartbeatTimeout,
+ @Nullable RetryPolicy retryPolicy,
+ @Nullable LoggerFactory loggerFactory,
+ @Nullable SslConfiguration sslConfiguration,
+ boolean metricsEnabled,
+ @Nullable IgniteClientAuthenticator authenticator,
+ long operationTimeout,
+ int sqlPartitionAwarenessMetadataCacheSize,
+ @Nullable String name,
+ long backgroundReResolveAddressesInterval
+ ) {
+ this(
+ addressFinder,
+ addresses,
+ connectTimeout,
+ backgroundReconnectInterval,
+ asyncContinuationExecutor,
+ heartbeatInterval,
+ heartbeatTimeout,
+ retryPolicy,
+ loggerFactory,
+ sslConfiguration,
+ metricsEnabled,
+ authenticator,
+ operationTimeout,
+ sqlPartitionAwarenessMetadataCacheSize,
+ name,
+ backgroundReResolveAddressesInterval,
+ null
+ );
}
/** {@inheritDoc} */
@@ -210,4 +280,18 @@ public final class IgniteClientConfigurationImpl
implements IgniteClientConfigur
public @Nullable String name() {
return name;
}
+
+ @Override
+ public long backgroundReResolveAddressesInterval() {
+ return backgroundReResolveAddressesInterval;
+ }
+
+ /**
+ * Gets custom address resolver.
+ *
+ * @return Custom address resolver.
+ */
+ @Nullable InetAddressResolver addressResolver() {
+ return addressResolver;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/InetAddressResolver.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/InetAddressResolver.java
new file mode 100644
index 00000000000..5bd66e4504a
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/InetAddressResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+
+/**
+ * DNS resolver.
+ */
+@FunctionalInterface
+public interface InetAddressResolver {
+ InetAddressResolver DEFAULT = (host, port) -> {
+ var res = new HashSet<InetSocketAddress>();
+
+ for (InetAddress inetAddr : InetAddress.getAllByName(host)) {
+ // Preserves unresolved address for loopback, since it can be
multiple interfaces
+ if (inetAddr.isLoopbackAddress()) {
+ res.add(InetSocketAddress.createUnresolved(host, port));
+ } else {
+ res.add(new InetSocketAddress(inetAddr, port));
+ }
+ }
+
+ return res;
+ };
+
+ /**
+ * Resolves the given host name to its IP addresses.
+ *
+ * @param host the host name to be resolved
+ * @param port the port to be resolved
+ * @return an collection of {@code InetSocketAddress} objects representing
the IP addresses of the host
+ * @throws UnknownHostException if the host name could not be resolved
+ */
+ Collection<InetSocketAddress> getAllByName(String host, int port) throws
UnknownHostException;
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index c4e8f8ebba6..2652778896c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.client;
+import static java.util.Objects.requireNonNullElse;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.delayedExecutor;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.ExceptionUtils.hasCauseOrSuppressed;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
@@ -30,6 +32,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -42,6 +45,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
@@ -93,7 +97,7 @@ public final class ReliableChannel implements AutoCloseable {
private final AtomicInteger curChIdx = new AtomicInteger();
/** Client configuration. */
- private final IgniteClientConfiguration clientCfg;
+ private final IgniteClientConfigurationImpl clientCfg;
/** Node channels by name (consistent id). */
private final Map<String, ClientChannelHolder> nodeChannelsByName = new
ConcurrentHashMap<>();
@@ -131,9 +135,18 @@ public final class ReliableChannel implements
AutoCloseable {
@Nullable
private ScheduledExecutorService streamerFlushExecutor;
+ /** Executor for async re-resolving addresses. */
+ private final Executor asyncContinuationExecutor;
+
/** Inflights. */
private final ClientTransactionInflights inflights;
+ /** Address resolver. */
+ private final InetAddressResolver addressResolver;
+
+ /** Future for scheduled re-resolving addresses. */
+ private volatile CompletableFuture<Void> scheduledReResolveAddressesFuture;
+
/**
* A validator that is called when a connection to a node is established,
* if it throws an exception, the network channel to that node will be
closed.
@@ -152,7 +165,7 @@ public final class ReliableChannel implements AutoCloseable
{
*/
ReliableChannel(
ClientChannelFactory chFactory,
- IgniteClientConfiguration clientCfg,
+ IgniteClientConfigurationImpl clientCfg,
ClientMetricSource metrics,
HybridTimestampTracker observableTimeTracker,
@Nullable ChannelValidator channelValidator
@@ -168,6 +181,12 @@ public final class ReliableChannel implements
AutoCloseable {
connMgr.start(clientCfg);
inflights = new ClientTransactionInflights();
+
+ addressResolver = requireNonNullElse(clientCfg.addressResolver(),
InetAddressResolver.DEFAULT);
+
+ asyncContinuationExecutor = clientCfg.asyncContinuationExecutor() ==
null
+ ? ForkJoinPool.commonPool()
+ : clientCfg.asyncContinuationExecutor();
}
/** {@inheritDoc} */
@@ -175,6 +194,12 @@ public final class ReliableChannel implements
AutoCloseable {
public synchronized void close() throws Exception {
closed = true;
+ @Nullable CompletableFuture<Void> fut =
scheduledReResolveAddressesFuture;
+
+ if (fut != null && !fut.isDone()) {
+ fut.cancel(true);
+ }
+
List<ClientChannelHolder> holders = channels;
List<ManuallyCloseable> closeables = new ArrayList<>();
@@ -414,20 +439,31 @@ public final class ReliableChannel implements
AutoCloseable {
* @return host:port_range address lines parsed as {@link
InetSocketAddress} as a key. Value is the amount of appearences of an address
* in {@code addrs} parameter.
*/
- private static Map<InetSocketAddress, Integer> parsedAddresses(String[]
addrs) {
+ private static Map<InetSocketAddress, Integer>
parsedAddresses(InetAddressResolver addressResolver, String[] addrs) {
if (addrs == null || addrs.length == 0) {
throw new IgniteException(CONFIGURATION_ERR, "Empty addresses");
}
- Collection<HostAndPort> ranges = new ArrayList<>(addrs.length);
+ Collection<HostAndPort> parsedAddrs = new ArrayList<>(addrs.length);
for (String a : addrs) {
- ranges.add(HostAndPort.parse(a,
IgniteClientConfiguration.DFLT_PORT, "Failed to parse Ignite server address"));
+ parsedAddrs.add(HostAndPort.parse(a,
IgniteClientConfiguration.DFLT_PORT, "Failed to parse Ignite server address"));
}
- return ranges.stream()
- .map(p -> InetSocketAddress.createUnresolved(p.host(),
p.port()))
- .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
+ Map<InetSocketAddress, Integer> map =
IgniteUtils.newHashMap(parsedAddrs.size());
+
+ for (HostAndPort addr : parsedAddrs) {
+ try {
+ for (InetSocketAddress sockAddr :
addressResolver.getAllByName(addr.host(), addr.port())) {
+ map.merge(sockAddr, 1, Integer::sum);
+ }
+ } catch (UnknownHostException e) {
+ var sockAddr = InetSocketAddress.createUnresolved(addr.host(),
addr.port());
+ map.merge(sockAddr, 1, Integer::sum);
+ }
+ }
+
+ return map;
}
/**
@@ -475,9 +511,7 @@ public final class ReliableChannel implements AutoCloseable
{
// Roll current channel even if a topology changes. To help find
working channel faster.
rollCurrentChannel(hld);
- if (scheduledChannelsReinit.get()) {
- channelsInitAsync();
- }
+ asyncContinuationExecutor.execute(this::reResolveAddresses);
}
/**
@@ -508,7 +542,7 @@ public final class ReliableChannel implements AutoCloseable
{
/**
* Init channel holders to all nodes.
*
- * @return boolean wheter channels was reinited.
+ * @return boolean whether channels were reinitialized.
*/
private synchronized boolean initChannelHolders() {
List<ClientChannelHolder> holders = channels;
@@ -526,11 +560,12 @@ public final class ReliableChannel implements
AutoCloseable {
}
if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
- newAddrs = parsedAddresses(hostAddrs);
+ newAddrs = parsedAddresses(addressResolver, hostAddrs);
prevHostAddrs = hostAddrs;
}
- } else if (holders == null) {
- newAddrs = parsedAddresses(clientCfg.addresses());
+ } else {
+ // Re-resolve DNS.
+ newAddrs = parsedAddresses(addressResolver, clientCfg.addresses());
}
if (newAddrs == null) {
@@ -541,9 +576,7 @@ public final class ReliableChannel implements AutoCloseable
{
Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
if (holders != null) {
- for (int i = 0; i < holders.size(); i++) {
- ClientChannelHolder h = holders.get(i);
-
+ for (ClientChannelHolder h : holders) {
curAddrs.put(h.chCfg.getAddress(), h);
allAddrs.add(h.chCfg.getAddress());
}
@@ -627,7 +660,11 @@ public final class ReliableChannel implements
AutoCloseable {
var fut = getDefaultChannelAsync();
// Establish secondary connections in the background.
- fut.thenAccept(unused ->
ForkJoinPool.commonPool().submit(this::initAllChannelsAsync));
+ fut.thenAccept(unused -> {
+ ForkJoinPool.commonPool().submit(this::initAllChannelsAsync);
+
+ scheduleNextReResolveAddresses();
+ });
return fut;
}
@@ -773,7 +810,11 @@ public final class ReliableChannel implements
AutoCloseable {
}
private void onPartitionAssignmentChanged(long timestamp) {
- partitionAssignmentTimestamp.updateAndGet(curTs -> Math.max(curTs,
timestamp));
+ var old = partitionAssignmentTimestamp.getAndUpdate(curTs ->
Math.max(curTs, timestamp));
+
+ if (timestamp > old) {
+ asyncContinuationExecutor.execute(this::reResolveAddresses);
+ }
}
/**
@@ -972,7 +1013,7 @@ public final class ReliableChannel implements
AutoCloseable {
var oldServerNode = serverNode;
if (oldServerNode != null) {
- nodeChannelsByName.remove(oldServerNode.name(), this);
+ rollNodeChannelsByName();
}
chFut = null;
@@ -988,7 +1029,7 @@ public final class ReliableChannel implements
AutoCloseable {
var oldServerNode = serverNode;
if (oldServerNode != null) {
- nodeChannelsByName.remove(oldServerNode.name(), this);
+ rollNodeChannelsByName();
}
closeChannel();
@@ -1007,6 +1048,20 @@ public final class ReliableChannel implements
AutoCloseable {
return ch != null && !ch.closed();
}
+
+ private void rollNodeChannelsByName() {
+ List<ClientChannelHolder> holders = channels;
+
+ for (ClientChannelHolder h : holders) {
+ if (h != this && h.serverNode != null &&
Objects.equals(serverNode.id(), h.serverNode.id())) {
+ nodeChannelsByName.put(h.serverNode.name(), h);
+
+ return;
+ }
+ }
+
+ nodeChannelsByName.remove(serverNode.name(), this);
+ }
}
private void logFailedEstablishConnection(ClientChannelHolder ch,
Throwable err) {
@@ -1028,4 +1083,49 @@ public final class ReliableChannel implements
AutoCloseable {
// May occur when nodes are restarted, which is expected.
return !hasCauseOrSuppressed(err, "Connection refused",
ConnectException.class);
}
+
+ /** Schedule the background re-resolve of addresses. */
+ private void scheduleNextReResolveAddresses() {
+ if (closed) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping scheduling re-resolve of addresses since
channel is closed");
+ }
+
+ return;
+ }
+
+ long interval = clientCfg.backgroundReResolveAddressesInterval();
+ if (interval > 0L) {
+ if (log.isDebugEnabled()) {
+ log.debug("Scheduling next re-resolve of addresses in {} ms",
interval);
+ }
+
+ scheduledReResolveAddressesFuture =
runAsync(this::reResolveAddresses,
+ delayedExecutor(interval, TimeUnit.MILLISECONDS,
asyncContinuationExecutor));
+ }
+ }
+
+ /** Resolve addresses in background. */
+ private void reResolveAddresses() {
+ CompletableFuture<Void> fut = scheduledReResolveAddressesFuture;
+
+ // Skip if another re-resolve is already running or closed.
+ if (closed || (fut != null && !fut.cancel(false))) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping re-resolve of addresses since another
re-resolve is already running or channel is closed");
+ }
+
+ return;
+ }
+
+ if (scheduledChannelsReinit.compareAndSet(false, true)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Re-resolving addresses and re-initializing channel
holders");
+ }
+
+ initChannelHolders();
+ }
+
+ scheduleNextReResolveAddresses();
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 939ced440fa..febb5513361 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -103,7 +103,7 @@ public class TcpIgniteClient implements IgniteClient {
* @param channelValidator A validator that is called when a connection to
a node is established,
* if it throws an exception, the network channel
to that node will be closed.
*/
- private TcpIgniteClient(IgniteClientConfiguration cfg,
HybridTimestampTracker observableTimeTracker,
+ private TcpIgniteClient(IgniteClientConfigurationImpl cfg,
HybridTimestampTracker observableTimeTracker,
@Nullable ChannelValidator channelValidator) {
this(TcpClientChannel::createAsync, cfg, observableTimeTracker,
channelValidator);
}
@@ -117,7 +117,10 @@ public class TcpIgniteClient implements IgniteClient {
* @param channelValidator A validator that is called when a connection to
a node is established,
* if it throws an exception, the network channel
to that node will be closed.
*/
- private TcpIgniteClient(ClientChannelFactory chFactory,
IgniteClientConfiguration cfg, HybridTimestampTracker observableTimeTracker,
+ private TcpIgniteClient(
+ ClientChannelFactory chFactory,
+ IgniteClientConfigurationImpl cfg,
+ HybridTimestampTracker observableTimeTracker,
@Nullable ChannelValidator channelValidator) {
assert chFactory != null;
assert cfg != null;
@@ -173,7 +176,7 @@ public class TcpIgniteClient implements IgniteClient {
* @param cfg Thin client configuration.
* @return Future representing pending completion of the operation.
*/
- public static CompletableFuture<IgniteClient>
startAsync(IgniteClientConfiguration cfg) {
+ public static CompletableFuture<IgniteClient>
startAsync(IgniteClientConfigurationImpl cfg) {
return startAsync(cfg, HybridTimestampTracker.atomicTracker(null),
null);
}
@@ -186,7 +189,9 @@ public class TcpIgniteClient implements IgniteClient {
* if it throws an exception, the network channel
to that node will be closed.
* @return Future representing pending completion of the operation.
*/
- public static CompletableFuture<IgniteClient>
startAsync(IgniteClientConfiguration cfg, HybridTimestampTracker
observableTimeTracker,
+ public static CompletableFuture<IgniteClient> startAsync(
+ IgniteClientConfigurationImpl cfg,
+ HybridTimestampTracker observableTimeTracker,
@Nullable ChannelValidator channelValidator) {
ErrorGroups.initialize();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientDnsDiscoveryTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientDnsDiscoveryTest.java
new file mode 100644
index 00000000000..07101010a2c
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientDnsDiscoveryTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
+import org.apache.ignite.internal.client.InetAddressResolver;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests client DNS resolution.
+ */
+class ClientDnsDiscoveryTest extends BaseIgniteAbstractTest {
+ protected static final String DEFAULT_TABLE = "DEFAULT_TEST_TABLE";
+
+ private static TestServer server1;
+
+ private static TestServer server2;
+
+ private static TestServer server3;
+
+ private static String loopbackAddress;
+
+ private static String hostAddress;
+
+ private static int port;
+
+ @BeforeAll
+ static void setUp() throws UnknownHostException {
+ loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
+ hostAddress = InetAddress.getLocalHost().getHostAddress();
+
+ server1 = TestServer.builder()
+ .listenAddresses(loopbackAddress)
+ .nodeName("server1")
+ .clusterId(AbstractClientTest.clusterId)
+ .build();
+
+ port = server1.port();
+
+ server2 = TestServer.builder()
+ .listenAddresses(hostAddress)
+ .nodeName("server2")
+ .clusterId(AbstractClientTest.clusterId)
+ .port(port)
+ .build();
+
+ server3 = TestServer.builder()
+ .nodeName("server3")
+ .clusterId(AbstractClientTest.clusterId)
+ .port(port + 1)
+ .build();
+ }
+
+ @AfterAll
+ static void tearDown() throws Exception {
+ closeAll(server1, server2, server3);
+ }
+
+ @Test
+ void testClientResolvesAllHostNameAddresses() {
+ String[] addresses = {"my-cluster:" + port};
+
+ // One invalid and one valid address.
+ AtomicReference<String[]> resolvedAddressesRef = new
AtomicReference<>(new String[]{loopbackAddress, "1.1.1.1"});
+
+ try (var client =
TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L,
resolvedAddressesRef)).join()) {
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server1", client.connections().get(0).name());
+ }
+ }
+
+ @Test
+ void testClientConnectToAllNodes() throws InterruptedException {
+ String[] addresses = {"my-cluster:" + port, "my-cluster:" +
server3.port()};
+
+ // All nodes addresses.
+ AtomicReference<String[]> resolvedAddressesRef = new
AtomicReference<>(new String[]{loopbackAddress, hostAddress});
+ Set<String> allNodeNames = Set.of("server1", "server2", "server3");
+
+ try (var client =
TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L,
resolvedAddressesRef)).join()) {
+ assertDoesNotThrow(() -> client.tables().tables());
+
+ // Wait until client connects to all nodes.
+ assertTrue(IgniteTestUtils.waitForCondition(
+ () ->
client.connections().stream().map(ClusterNode::name).allMatch(allNodeNames::contains),
1000),
+ () -> "Client should have three connections: " +
client.connections().size());
+ }
+ }
+
+ @Test
+ void testClientRefreshesDnsOnNodeFailure() throws Exception {
+ // One valid address.
+ AtomicReference<String[]> resolvedAddressesRef = new
AtomicReference<>(new String[]{loopbackAddress});
+
+ try (var server4 =
TestServer.builder().listenAddresses(loopbackAddress).nodeName("server4").clusterId(AbstractClientTest.clusterId)
+ .build();
+ var ignored =
TestServer.builder().listenAddresses(hostAddress).nodeName("server5").clusterId(AbstractClientTest.clusterId)
+ .port(server4.port()).build();
+ var client =
TcpIgniteClient.startAsync(getClientConfiguration(new String[]{"my-cluster:" +
server4.port()},
+ 0L, resolvedAddressesRef)).join()) {
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server4", client.connections().get(0).name());
+
+ // Both nodes.
+ resolvedAddressesRef.set(new String[]{hostAddress});
+
+ // Stop first node.
+ server4.close();
+
+ Thread.sleep(100L); // Wait for channels were reinitialized.
+
+ // Client should reconnect to the second node.
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server5", client.connections().get(0).name());
+ }
+ }
+
+ @Test
+ void testClientRefreshesDnsOnPrimaryReplicaChange() throws Exception {
+ String[] addresses = {"my-cluster:" + port};
+
+ // One valid address points to first node.
+ AtomicReference<String[]> resolvedAddressesRef = new
AtomicReference<>(new String[]{loopbackAddress});
+
+ try (var client =
TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L,
resolvedAddressesRef)).join()) {
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server1", client.connections().get(0).name());
+
+ // Change address to second node.
+ resolvedAddressesRef.set(new String[]{hostAddress});
+
+ ReliableChannel ch = ((TcpIgniteClient) client).channel();
+ server1.placementDriver().setReplicas(List.of("server3",
"server2", "server1", "server2"),
+ 1,
+ 1,
+ ch.partitionAssignmentTimestamp() + 1);
+
+ Thread.sleep(100L); // Wait for channels were reinitialized.
+
+ // Client should reconnect to the second node.
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server2", client.connections().get(0).name());
+ }
+ }
+
+ @Test
+ void testClientRefreshesDnsByTimeout() throws Exception {
+ String[] addresses = {"my-cluster:" + port};
+
+ // One valid address points to first node.
+ AtomicReference<String[]> resolvedAddressesRef = new
AtomicReference<>(new String[]{loopbackAddress});
+
+ try (var client =
TcpIgniteClient.startAsync(getClientConfiguration(addresses, 500L,
resolvedAddressesRef)).join()) {
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server1", client.connections().get(0).name());
+
+ // Change address to second node.
+ resolvedAddressesRef.set(new String[]{hostAddress});
+
+ Thread.sleep(500L); // Wait for background dns refresh.
+
+ // Client should reconnect to the second node.
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server2", client.connections().get(0).name());
+ }
+ }
+
+ @Test
+ void testMultipleIpsSameNode() throws InterruptedException {
+ String[] addresses = {"my-cluster:" + server3.port()};
+
+ // One node.
+ AtomicReference<String[]> resolvedAddressesRef = new
AtomicReference<>(new String[]{loopbackAddress, hostAddress});
+
+ try (var client =
TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L,
resolvedAddressesRef)).join()) {
+ assertDoesNotThrow(() -> client.tables().tables());
+ assertEquals("server3", client.connections().get(0).name());
+
+ ReliableChannel ch = ((TcpIgniteClient) client).channel();
+
+ List<?> channels = IgniteTestUtils.getFieldValue(ch, "channels");
+ assertEquals(2, channels.size());
+
+ // Update to another IPs for the same node.
+ resolvedAddressesRef.set(new String[]{hostAddress});
+
+ server3.placementDriver().setReplicas(List.of("server3",
"server2", "server1", "server2"),
+ 1,
+ 1,
+ ch.partitionAssignmentTimestamp() + 1);
+
+ // Wait until client connects to all nodes.
+ assertTrue(IgniteTestUtils.waitForCondition(() ->
IgniteTestUtils.<List>getFieldValue(ch, "channels").size() == 1, 1000),
+ () -> "Client should have three connections: " +
client.connections().size());
+
+ // Client should reconnect to the second ips.
+ assertDoesNotThrow(() -> client.tables().tables());
+ }
+ }
+
+ private static IgniteClientConfigurationImpl getClientConfiguration(
+ String[] addresses,
+ long backgroundReResolveAddressesInterval,
+ AtomicReference<String[]> resolvedAddressesRef
+ ) {
+ InetAddressResolver addressResolver = (host, port) -> {
+ if ("my-cluster".equals(host)) {
+ return Arrays.stream(resolvedAddressesRef.get())
+ .map(s -> InetSocketAddress.createUnresolved(s, port))
+ .collect(toList());
+ }
+
+ return singletonList(InetSocketAddress.createUnresolved(host,
port));
+ };
+
+ return new IgniteClientConfigurationImpl(
+ null,
+ addresses,
+ 500,
+ 0,
+ null,
+ 50,
+ 50,
+ new RetryLimitPolicy(),
+ null,
+ null,
+ false,
+ null,
+ 1000,
+ 1000,
+ "my-client",
+ backgroundReResolveAddressesInterval,
+ addressResolver
+ );
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index 7a72dc401e6..24d655656a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -137,9 +137,11 @@ public class ConnectionTest extends AbstractClientTest {
String nodeName = "server-2";
FakeIgnite ignite = new FakeIgnite(nodeName);
- try (TestServer testServer =
- new TestServer(0, ignite, null, null, nodeName,
UUID.randomUUID(), null, null, false, null)) {
-
+ try (TestServer testServer = TestServer.builder()
+ .nodeName(nodeName)
+ .ignite(ignite)
+ .enableRequestHandling(false)
+ .build()) {
Builder clientBuilder = IgniteClient.builder()
.addresses("127.0.0.1:" + testServer.port())
.retryPolicy(new RetryLimitPolicy().retryLimit(0))
@@ -165,9 +167,11 @@ public class ConnectionTest extends AbstractClientTest {
String nodeName = "server-2";
FakeIgnite ignite = new FakeIgnite(nodeName);
- try (TestServer testServer =
- new TestServer(0, ignite, null, null, nodeName,
UUID.randomUUID(), null, null, false, null)) {
-
+ try (TestServer testServer = TestServer.builder()
+ .nodeName(nodeName)
+ .ignite(ignite)
+ .enableRequestHandling(false)
+ .build()) {
Builder clientBuilder = IgniteClient.builder()
.addresses("127.0.0.1:" + testServer.port())
.retryPolicy(new RetryLimitPolicy().retryLimit(0))
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
index 01f85d2ea23..7c8344d4380 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
@@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.BitSet;
-import java.util.UUID;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.client.ClientChannel;
@@ -34,6 +33,7 @@ import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
/**
@@ -48,12 +48,19 @@ public class FeatureCompatibilityTest extends
BaseIgniteAbstractTest {
private void startServer(@Nullable BitSet features) {
ignite = new FakeIgnite("server-1", new
TestHybridClock(System::currentTimeMillis));
- testServer = new TestServer(0, ignite, reqId -> false, null,
"server-1", UUID.randomUUID(), null, null, true, features);
+
+ testServer = TestServer.builder()
+ .ignite(ignite)
+ .nodeName("server-1")
+ .features(features)
+ .shouldDropConnection(reqId -> false)
+ .build();
client = IgniteClient.builder().addresses("127.0.0.1:" +
testServer.port()).build();
}
- private void stopServer() throws Exception {
+ @AfterEach
+ public void stopServer() throws Exception {
closeAll(client, testServer);
}
@@ -61,15 +68,11 @@ public class FeatureCompatibilityTest extends
BaseIgniteAbstractTest {
public void testDirectMappingEnabled() throws Exception {
startServer(null);
- try {
- ReliableChannel ch = ((TcpIgniteClient) client).channel();
+ ReliableChannel ch = ((TcpIgniteClient) client).channel();
- ClientChannel ch0 = ch.getChannelAsync(null).join();
+ ClientChannel ch0 = ch.getChannelAsync(null).join();
-
assertTrue(ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS, TX_PIGGYBACK));
- } finally {
- stopServer();
- }
+
assertTrue(ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS, TX_PIGGYBACK));
}
@Test
@@ -78,16 +81,12 @@ public class FeatureCompatibilityTest extends
BaseIgniteAbstractTest {
features.set(TX_DIRECT_MAPPING.featureId());
startServer(features);
- try {
- ReliableChannel ch = ((TcpIgniteClient) client).channel();
+ ReliableChannel ch = ((TcpIgniteClient) client).channel();
- ClientChannel ch0 = ch.getChannelAsync(null).join();
+ ClientChannel ch0 = ch.getChannelAsync(null).join();
-
assertFalse(ch0.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING));
-
assertFalse(ch0.protocolContext().isFeatureSupported(TX_DELAYED_ACKS));
-
assertFalse(ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK));
- } finally {
- stopServer();
- }
+
assertFalse(ch0.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING));
+ assertFalse(ch0.protocolContext().isFeatureSupported(TX_DELAYED_ACKS));
+ assertFalse(ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK));
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java
index 83f626b2385..2604dfc9283 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.client.AbstractClientTest.getClusterNodes;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
@@ -61,7 +60,11 @@ public class ObservableTimestampComputePropagationTest
extends BaseIgniteAbstrac
@BeforeAll
public static void startServers() {
var ignite1 = new FakeIgnite("server-1", new
TestHybridClock(serverTimestamp::get));
- testServer = new TestServer(0, ignite1, null, null, "server-1",
UUID.randomUUID(), null, null, true, null);
+
+ testServer = TestServer.builder()
+ .ignite(ignite1)
+ .nodeName("server-1")
+ .build();
}
@AfterAll
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index fcfba93f634..ef7f4c6b3ec 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_S
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.internal.TestHybridClock;
@@ -49,7 +48,10 @@ public class ObservableTimestampPropagationTest extends
BaseIgniteAbstractTest {
@BeforeAll
public static void startServer2() {
ignite = new FakeIgnite("server-2", new
TestHybridClock(currentServerTimestamp::get));
- testServer = new TestServer(0, ignite, null, null, "server-2",
UUID.randomUUID(), null, null, true, null);
+ testServer = TestServer.builder()
+ .ignite(ignite)
+ .nodeName("server-2")
+ .build();
client = IgniteClient.builder().addresses("127.0.0.1:" +
testServer.port()).build();
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index 559f67556f6..c1f551fdf4e 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -100,11 +100,15 @@ public class ReconnectTest extends BaseIgniteAbstractTest
{
Builder builder = IgniteClient.builder()
.addresses("127.0.0.1:10901", "127.0.0.1:10902",
"127.0.0.1:10903")
.backgroundReconnectInterval(reconnectEnabled ? 50 : 0)
- .heartbeatInterval(50);
+ .heartbeatInterval(100)
+ .backgroundReResolveAddressesInterval(0L);
try (var client = builder.build()) {
waitForConnections(client, 2);
+ // Skip channels refresh on partition assignment during connection.
+ Thread.sleep(100L);
+
server2.close();
waitForConnections(client, 1);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 98e284e3d0f..be5ca44560b 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -247,7 +247,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
@Test
public void testRetryReadPolicyAllOperationsSupported() {
var plc = new RetryReadPolicy();
- var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, null, 0,
0, null, null, null, false, null, 0, 1024, null);
+ var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, null, 0,
0, null, null, null, false, null, 0, 1024, null, 0);
for (var op : ClientOperationType.values()) {
var ctx = new RetryPolicyContextImpl(cfg, op, 0, null);
@@ -336,6 +336,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
.addresses("127.0.0.1:" + server.port())
.retryPolicy(retryPolicy)
.loggerFactory(loggerFactory)
+ .backgroundReResolveAddressesInterval(0)
.build();
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index e7e30665e6b..4121420d3ea 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -106,6 +106,10 @@ public class TestServer implements AutoCloseable {
private final FakeIgnite ignite;
+ public static Builder builder() {
+ return new Builder();
+ }
+
/**
* Constructor.
*
@@ -151,6 +155,7 @@ public class TestServer implements AutoCloseable {
securityConfiguration,
port,
true,
+ null,
null
);
}
@@ -171,7 +176,8 @@ public class TestServer implements AutoCloseable {
@Nullable SecurityConfiguration securityConfiguration,
@Nullable Integer port,
boolean enableRequestHandling,
- @Nullable BitSet features
+ @Nullable BitSet features,
+ String @Nullable [] listenAddresses
) {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
@@ -199,6 +205,7 @@ public class TestServer implements AutoCloseable {
.changePort(port != null ? port : getFreePort())
.changeIdleTimeoutMillis(idleTimeout)
.changeSendServerExceptionStackTraceToClient(true)
+ .changeListenAddresses(listenAddresses == null ? new
String[0] : listenAddresses)
).join();
bootstrapFactory = new
NettyBootstrapFactory(cfg.getConfiguration(NetworkExtensionConfiguration.KEY).network(),
"TestServer-");
@@ -390,4 +397,97 @@ public class TestServer implements AutoCloseable {
throw new IOError(e);
}
}
+
+ /**
+ * Builder.
+ */
+ public static class Builder {
+ private long idleTimeout = 1000;
+ private @Nullable FakeIgnite ignite;
+ private @Nullable Function<Integer, Boolean> shouldDropConnection;
+ private @Nullable Function<Integer, Integer> responseDelay;
+ private @Nullable String nodeName;
+ private UUID clusterId = UUID.randomUUID();
+ private @Nullable SecurityConfiguration securityConfiguration;
+ private @Nullable Integer port;
+ private boolean enableRequestHandling = true;
+ private @Nullable BitSet features;
+ private @Nullable String[] listenAddresses;
+
+ public Builder idleTimeout(long idleTimeout) {
+ this.idleTimeout = idleTimeout;
+ return this;
+ }
+
+ public Builder ignite(FakeIgnite ignite) {
+ this.ignite = ignite;
+ return this;
+ }
+
+ public Builder shouldDropConnection(@Nullable Function<Integer,
Boolean> shouldDropConnection) {
+ this.shouldDropConnection = shouldDropConnection;
+ return this;
+ }
+
+ public Builder responseDelay(@Nullable Function<Integer, Integer>
responseDelay) {
+ this.responseDelay = responseDelay;
+ return this;
+ }
+
+ public Builder nodeName(@Nullable String nodeName) {
+ this.nodeName = nodeName;
+ return this;
+ }
+
+ public Builder clusterId(UUID clusterId) {
+ this.clusterId = clusterId;
+ return this;
+ }
+
+ public Builder securityConfiguration(@Nullable SecurityConfiguration
securityConfiguration) {
+ this.securityConfiguration = securityConfiguration;
+ return this;
+ }
+
+ public Builder port(@Nullable Integer port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder enableRequestHandling(boolean enableRequestHandling) {
+ this.enableRequestHandling = enableRequestHandling;
+ return this;
+ }
+
+ public Builder features(@Nullable BitSet features) {
+ this.features = features;
+ return this;
+ }
+
+ public Builder listenAddresses(@Nullable String... listenAddresses) {
+ this.listenAddresses = listenAddresses;
+ return this;
+ }
+
+ /**
+ * Builds the test server.
+ *
+ * @return Test server.
+ */
+ public TestServer build() {
+ return new TestServer(
+ idleTimeout,
+ ignite == null ? new FakeIgnite() : ignite,
+ shouldDropConnection,
+ responseDelay,
+ nodeName,
+ clusterId != null ? clusterId : UUID.randomUUID(),
+ securityConfiguration,
+ port,
+ enableRequestHandling,
+ features,
+ listenAddresses
+ );
+ }
+ }
}
diff --git
a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
index 23565243300..cf1048df039 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
@@ -320,7 +320,8 @@ public class IgniteJdbcDriver implements Driver {
extractAuthenticationConfiguration(connectionProperties),
IgniteClientConfiguration.DFLT_OPERATION_TIMEOUT,
connectionProperties.getPartitionAwarenessMetadataCacheSize(),
- JdbcDatabaseMetadata.DRIVER_NAME
+ JdbcDatabaseMetadata.DRIVER_NAME,
+
IgniteClientConfigurationImpl.DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL
);
ChannelValidator channelValidator = ctx -> {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java
index 6d0ce4d5478..df9e5aeb0c9 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java
@@ -215,7 +215,8 @@ public class ItThinClientChannelValidatorTest extends
BaseIgniteAbstractTest {
null,
IgniteClientConfiguration.DFLT_OPERATION_TIMEOUT,
IgniteClientConfiguration.DFLT_SQL_PARTITION_AWARENESS_METADATA_CACHE_SIZE,
- null
+ null,
+
IgniteClientConfiguration.DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL
);
return await(TcpIgniteClient.startAsync(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
index 45a4b8cdb34..20336aa3695 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.runner.app.client;
import static org.apache.ignite.lang.ErrorGroups.Table.TABLE_NOT_FOUND_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -27,6 +29,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.TcpIgniteClient;
@@ -119,9 +123,17 @@ public class ItThinClientConnectionTest extends
ItAbstractThinClientTest {
@Test
void testExceptionHasHint() {
- IgniteException ex = assertThrows(IgniteException.class, () ->
client().sql().execute(null, "select x from bad"));
- assertEquals("To see the full stack trace set
clientConnector.sendServerExceptionStackTraceToClient:true",
- ex.getCause().getCause().getCause().getCause().getMessage());
+ // Execute on all nodes to collect all types of exception.
+ List<String> causes = IntStream.range(0,
client().configuration().addresses().length)
+ .mapToObj(i -> {
+ IgniteException ex = assertThrows(IgniteException.class,
() -> client().sql().execute(null, "select x from bad"));
+
+ return
ex.getCause().getCause().getCause().getCause().getMessage();
+ })
+ .collect(Collectors.toList());
+
+ assertThat(causes,
+ hasItem(containsString("To see the full stack trace set
clientConnector.sendServerExceptionStackTraceToClient:true")));
}
@Test