This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch
IGNITE-23856__thin_cln_channels_duplication
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to
refs/heads/IGNITE-23856__thin_cln_channels_duplication by this push:
new 26f99f2d2d4 IGNITE-23856 Ignite Thin Client channels unwanted
duplication (#11855)
26f99f2d2d4 is described below
commit 26f99f2d2d415e8aebd6028218b045914fbdd185
Author: Popov Aleksandr <[email protected]>
AuthorDate: Thu Mar 20 11:09:05 2025 -0700
IGNITE-23856 Ignite Thin Client channels unwanted duplication (#11855)
Co-authored-by: Maksim Timonin <[email protected]>
---
.../client/thin/ClientDiscoveryContext.java | 20 +--
.../internal/client/thin/ReliableChannel.java | 34 ++---
.../org/apache/ignite/client/ReliabilityTest.java | 33 +++--
.../thin/ReliableChannelDuplicationTest.java | 160 +++++++++++++++++++++
.../internal/client/thin/ReliableChannelTest.java | 46 ++++--
.../org/apache/ignite/client/ClientTestSuite.java | 4 +-
6 files changed, 241 insertions(+), 56 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientDiscoveryContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientDiscoveryContext.java
index c1262d76dc9..f255231383d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientDiscoveryContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientDiscoveryContext.java
@@ -192,11 +192,11 @@ public class ClientDiscoveryContext {
/**
* Gets list of endpoins for each node.
*
- * @return Collection of nodes with list of endpoints for each node, or
{@code null} if endpoints are not changed
+ * @return Set of nodes with list of endpoints for each node, or {@code
null} if endpoints are not changed
* since last request.
*/
- @Nullable Collection<List<InetSocketAddress>> getEndpoints() {
- Collection<List<InetSocketAddress>> endpoints = null;
+ @Nullable Set<List<InetSocketAddress>> getEndpoints() {
+ Set<List<InetSocketAddress>> endpoints = null;
TopologyInfo topInfo = this.topInfo;
if (addrFinder != null || topInfo.topVer == UNKNOWN_TOP_VER) {
@@ -219,9 +219,9 @@ public class ClientDiscoveryContext {
}
/**
- * @return List of host:port_range address lines parsed as {@link
InetSocketAddress}.
+ * @return Set of host:port_range address lines parsed as {@link
InetSocketAddress}.
*/
- private static Collection<List<InetSocketAddress>>
parsedAddresses(String[] addrs) throws ClientException {
+ private static Set<List<InetSocketAddress>> parsedAddresses(String[]
addrs) throws ClientException {
if (F.isEmpty(addrs))
throw new ClientException("Empty addresses");
@@ -245,7 +245,7 @@ public class ClientDiscoveryContext {
.flatMap(r -> IntStream
.rangeClosed(r.portFrom(), r.portTo()).boxed()
.map(p ->
Collections.singletonList(InetSocketAddress.createUnresolved(r.host(), p)))
- ).collect(Collectors.toList());
+ ).collect(Collectors.toSet());
}
/** */
@@ -257,7 +257,7 @@ public class ClientDiscoveryContext {
private final Map<UUID, NodeInfo> nodes;
/** Normalized nodes endpoints. */
- private final Collection<List<InetSocketAddress>> endpoints;
+ private final Set<List<InetSocketAddress>> endpoints;
/** */
private TopologyInfo(long ver, Map<UUID, NodeInfo> nodes) {
@@ -267,8 +267,8 @@ public class ClientDiscoveryContext {
}
/** Remove duplicates from nodes endpoints. */
- private static Collection<List<InetSocketAddress>>
normalizeEndpoints(Collection<NodeInfo> nodes) {
- Collection<List<InetSocketAddress>> endpoints = new
ArrayList<>(nodes.size());
+ private static Set<List<InetSocketAddress>>
normalizeEndpoints(Collection<NodeInfo> nodes) {
+ Set<List<InetSocketAddress>> endpoints = new HashSet<>();
Set<InetSocketAddress> used = new HashSet<>();
for (NodeInfo nodeInfo : nodes) {
@@ -286,7 +286,7 @@ public class ClientDiscoveryContext {
endpoints.add(addrs);
}
- return Collections.unmodifiableCollection(endpoints);
+ return endpoints;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index a4f1d5c42c5..c792feaa61e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -631,21 +631,10 @@ final class ReliableChannel implements AutoCloseable {
return;
}
- // Add connected channels to the list to avoid unnecessary reconnects,
unless address finder is used.
- if (holders != null && clientCfg.getAddressesFinder() == null) {
- // Do not modify the original list.
- newAddrs = new ArrayList<>(newAddrs);
-
- for (ClientChannelHolder h : holders) {
- ClientChannel ch = h.ch;
-
- if (ch != null && !ch.closed())
- newAddrs.add(h.getAddresses());
- }
- }
-
Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+ List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
Set<InetSocketAddress> newAddrsSet =
newAddrs.stream().flatMap(Collection::stream).collect(Collectors.toSet());
// Close obsolete holders or map old but valid addresses to holders
@@ -656,20 +645,25 @@ final class ReliableChannel implements AutoCloseable {
for (InetSocketAddress addr : h.getAddresses()) {
// If new endpoints contain at least one of channel
addresses, don't close this channel.
if (newAddrsSet.contains(addr)) {
- ClientChannelHolder oldHld =
curAddrs.putIfAbsent(addr, h);
+ curAddrs.putIfAbsent(addr, h);
- if (oldHld == null || oldHld == h) // If not duplicate.
- found = true;
+ found = true;
+
+ break;
}
}
+ // Add connected channels to the list to avoid unnecessary
reconnects, unless address finder is used.
+ if (clientCfg.getAddressesFinder() == null && h.ch != null &&
!h.ch.closed())
+ found = true;
+
if (!found)
h.close();
+ else
+ reinitHolders.add(h);
}
}
- List<ClientChannelHolder> reinitHolders = new ArrayList<>();
-
// The variable holds a new index of default channel after topology
change.
// Suppose that reuse of the channel is better than open new
connection.
int dfltChannelIdx = -1;
@@ -699,12 +693,12 @@ final class ReliableChannel implements AutoCloseable {
if (hld == null) { // If not found, create the new one.
hld = new ClientChannelHolder(new
ClientChannelConfiguration(clientCfg, addrs));
+ reinitHolders.add(hld);
+
for (InetSocketAddress addr : addrs)
curAddrs.putIfAbsent(addr, hld);
}
- reinitHolders.add(hld);
-
if (hld == currDfltHolder)
dfltChannelIdx = reinitHolders.size() - 1;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 506ebd33f64..e7a2ff7afba 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.client.thin.ClientOperation;
import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.client.thin.ServicesTest;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
@@ -223,7 +224,7 @@ public class ReliabilityTest extends AbstractThinClientTest
{
public void testSingleServerDuplicatedFailover() throws Exception {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client =
Ignition.startClient(getClientConfiguration()
- .setAddresses(F.first(cluster.clientAddresses()),
F.first(cluster.clientAddresses()))
+ .setAddresses(F.first(cluster.clientAddresses()))
.setClusterDiscoveryEnabled(false))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
@@ -234,7 +235,7 @@ public class ReliabilityTest extends AbstractThinClientTest
{
// Fail.
dropAllThinClientConnections(Ignition.allGrids().get(0));
- // Reuse second address without fail.
+ // Reuse the address after retry without fail.
cachePut(cache, 0, 0);
}
}
@@ -247,7 +248,7 @@ public class ReliabilityTest extends AbstractThinClientTest
{
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client =
Ignition.startClient(getClientConfiguration()
.setRetryPolicy(new ClientRetryReadPolicy())
- .setAddresses(F.first(cluster.clientAddresses()),
F.first(cluster.clientAddresses()))
+ .setAddresses(F.first(cluster.clientAddresses()))
.setClusterDiscoveryEnabled(false))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
@@ -273,27 +274,25 @@ public class ReliabilityTest extends
AbstractThinClientTest {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client =
Ignition.startClient(getClientConfiguration()
.setRetryPolicy(new ExceptionRetryPolicy())
- .setAddresses(F.first(cluster.clientAddresses()),
F.first(cluster.clientAddresses()))
+ .setAddresses(F.first(cluster.clientAddresses()))
.setClusterDiscoveryEnabled(false))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
- dropAllThinClientConnections(Ignition.allGrids().get(0));
-
- Throwable asyncEx = GridTestUtils.assertThrows(null, () ->
cache.getAsync(0).get(),
- ExecutionException.class, "Channel is closed");
-
- GridTestUtils.assertContains(null, asyncEx.getMessage(),
F.first(cluster.clientAddresses()));
dropAllThinClientConnections(Ignition.allGrids().get(0));
- Throwable syncEx = GridTestUtils.assertThrows(null, () ->
cache.get(0),
- ClientConnectionException.class, "Channel is closed");
-
- GridTestUtils.assertContains(null, syncEx.getMessage(),
F.first(cluster.clientAddresses()));
-
- for (Throwable t : new Throwable[] {asyncEx.getCause(), syncEx}) {
- assertEquals("Error in policy.",
t.getSuppressed()[0].getMessage());
+ Throwable ex;
+ if (async) {
+ ex = GridTestUtils.assertThrows(null, () ->
cache.getAsync(0).get(),
+ ExecutionException.class, "Channel is closed");
+ }
+ else {
+ ex = GridTestUtils.assertThrows(null, () -> cache.get(0),
+ ClientConnectionException.class, "Channel is closed");
}
+
+ X.hasCause(ex, "Error in policy");
+ GridTestUtils.assertContains(null, ex.getMessage(),
F.first(cluster.clientAddresses()));
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelDuplicationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelDuplicationTest.java
new file mode 100644
index 00000000000..819846e00aa
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelDuplicationTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.stream.IntStream.range;
+
+/**
+ * Tests for duplication in channels' list.
+ */
+@RunWith(Parameterized.class)
+public class ReliableChannelDuplicationTest extends
ThinClientAbstractPartitionAwarenessTest {
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** Grid count. */
+ @Parameterized.Parameter(0)
+ public int gridCnt;
+
+ /** */
+ @Parameterized.Parameters(name = "gridCount = {0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ { 1 },
+ { 3 }
+ });
+ }
+
+ /**
+ * Test after cluster restart the number of channels remains equal to the
number of nodes.
+ */
+ @Test
+ public void testDuplicationOnClusterRestart() throws Exception {
+ startGrids(gridCnt);
+
+ initClient(getClientConfiguration(range(0, gridCnt).toArray()),
range(0, gridCnt).toArray());
+
+
assertNoDuplicates(((TcpIgniteClient)client).reliableChannel().getChannelHolders());
+
+ stopAllGrids();
+
+ startGrids(gridCnt);
+
+
assertNoDuplicates(((TcpIgniteClient)client).reliableChannel().getChannelHolders());
+ }
+
+ /**
+ * Test behavior after stopping a single node in the cluster.
+ */
+ @Test
+ public void testStopSingleNodeDuringOperation() throws Exception {
+ Assume.assumeFalse(gridCnt == 1);
+
+ testChannelDuplication(1, 0);
+ }
+
+ /**
+ * Test behavior after stopping and restarting a node.
+ */
+ @Test
+ public void testStopAndRestartNode() throws Exception {
+ Assume.assumeFalse(gridCnt == 1);
+
+ testChannelDuplication(1, 1);
+ }
+
+ /**
+ * Test behavior after stopping multiple nodes in the cluster.
+ */
+ @Test
+ public void testStopMultipleNodesDuringOperation() throws Exception {
+ Assume.assumeFalse(gridCnt < 3);
+
+ testChannelDuplication(2, 2);
+ }
+
+ /**
+ * Asserts that there are no duplicate channels in the list of holders
based on their remote addresses.
+ *
+ * @param holders List of channel holders.
+ */
+ private void assertNoDuplicates(List<ReliableChannel.ClientChannelHolder>
holders) {
+ Set<InetSocketAddress> addrs = new HashSet<>();
+
+ for (ReliableChannel.ClientChannelHolder holder : holders) {
+ holder.getAddresses().forEach(addr -> {
+ if (!addrs.add(addr))
+ throw new AssertionError("Duplicate remote address found:
" + addr);
+ });
+ }
+ }
+
+ /**
+ * Stop a Node and provide an operation to notify the client about new
topology.
+ */
+ private void stopNodeAndMakeTopologyChangeDetection(int idx) {
+ stopGrid(idx);
+
+ detectTopologyChange();
+ }
+
+ /**
+ * Tests that no duplicate channel holders are created during node
restarts and topology changes.
+ *
+ * @param gridsStop int Grids to stop.
+ * @param gridsRestart int Grids to restart after stop.
+ */
+ private void testChannelDuplication(int gridsStop, int gridsRestart)
throws Exception {
+ startGrids(gridCnt);
+
+ initClient(getClientConfiguration(range(0, gridCnt).toArray()),
range(0, gridCnt).toArray());
+
+
assertNoDuplicates(((TcpIgniteClient)client).reliableChannel().getChannelHolders());
+
+ for (int i = 0; i < gridsStop; i++) {
+ stopNodeAndMakeTopologyChangeDetection(i);
+
+
assertNoDuplicates(((TcpIgniteClient)client).reliableChannel().getChannelHolders());
+ }
+
+ for (int i = 0; i < gridsRestart; i++) {
+ startGrid(i);
+
+ detectTopologyChange();
+
+ awaitChannelsInit(i);
+
+
assertNoDuplicates(((TcpIgniteClient)client).reliableChannel().getChannelHolders());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 7738fa0f91a..20ed25da5af 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -75,7 +75,7 @@ public class ReliableChannelTest {
rc.channelsInit();
- assertEquals(3, rc.getChannelHolders().size());
+ assertEquals(2, rc.getChannelHolders().size());
}
/**
@@ -140,8 +140,11 @@ public class ReliableChannelTest {
.nextAddresesResponse("127.0.0.1:10803", "127.0.0.1:10803",
"127.0.0.1:10806")
.nextAddresesResponse("127.0.0.1:10803", "127.0.0.1:10803",
"127.0.0.1:10803")
.nextAddresesResponse("127.0.0.1:10803", "127.0.0.1:10803",
"127.0.0.1:10804")
- .nextAddresesResponse("127.0.0.1:10803", "127.0.0.1:10804",
"127.0.0.1:10804")
- .nextAddresesResponse("127.0.0.1:10800", "127.0.0.1:10801",
"127.0.0.1:10802");
+ .nextAddresesResponse("127.0.0.1:10800", "127.0.0.1:10801",
"127.0.0.1:10802")
+ .nextAddresesResponse("127.0.0.1:10807")
+ .nextAddresesResponse("127.0.0.1:10808", "127.0.0.1:10809",
"127.0.0.1:10808")
+ .nextAddresesResponse("127.0.0.1:10810", "127.0.0.1:10808",
"127.0.0.1:10809")
+ .nextAddresesResponse("127.0.0.1:10811", "127.0.0.1:10811",
"127.0.0.1:10812", "127.0.0.1:10813");
ClientConfiguration ccfg = new
ClientConfiguration().setAddressesFinder(finder);
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
@@ -163,15 +166,21 @@ public class ReliableChannelTest {
assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10803",
"127.0.0.1:10804", "127.0.0.1:10806"));
- assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10803",
"127.0.0.1:10803", "127.0.0.1:10806"));
-
- assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10803",
"127.0.0.1:10803", "127.0.0.1:10803"));
+ assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10803",
"127.0.0.1:10806"));
- assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10803",
"127.0.0.1:10803", "127.0.0.1:10804"));
+ assertAddrReInitAndEqualsTo.accept(List.of("127.0.0.1:10803"));
- assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10803",
"127.0.0.1:10804", "127.0.0.1:10804"));
+ assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10803",
"127.0.0.1:10804"));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10800",
"127.0.0.1:10801", "127.0.0.1:10802"));
+
+ assertAddrReInitAndEqualsTo.accept(List.of("127.0.0.1:10807"));
+
+ assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10808",
"127.0.0.1:10809"));
+
+ assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10808",
"127.0.0.1:10809", "127.0.0.1:10810"));
+
+ assertAddrReInitAndEqualsTo.accept(Arrays.asList("127.0.0.1:10811",
"127.0.0.1:10812", "127.0.0.1:10813"));
}
/**
@@ -341,6 +350,27 @@ public class ReliableChannelTest {
}, false);
}
+ /**
+ * Checks that channels' count remains the same in static configuration
after reinitialization.
+ */
+ @Test
+ public void testChannelsCountRemainsAfterReinit() {
+ String[] addrs = {"127.0.0.1:10800", "127.0.0.1:10801"};
+ TestAddressFinder finder = new TestAddressFinder()
+ .nextAddresesResponse(addrs)
+ .nextAddresesResponse(addrs);
+
+ ClientConfiguration ccfg = new
ClientConfiguration().setAddressesFinder(finder);
+ ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+
+ rc.channelsInit();
+ int initCnt = rc.getChannelHolders().size();
+
+ rc.initChannelHolders();
+
+ assertEquals(initCnt, rc.getChannelHolders().size());
+ }
+
/**
* Async operation should fail if cluster is down after send operation and
handle topology change.
*/
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 2dad1730a6a..d9dc60c8931 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.client.thin.InvokeTest;
import org.apache.ignite.internal.client.thin.MetadataRegistrationTest;
import
org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
import org.apache.ignite.internal.client.thin.RecoveryModeTest;
+import org.apache.ignite.internal.client.thin.ReliableChannelDuplicationTest;
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
import org.apache.ignite.internal.client.thin.ServiceAwarenessTest;
import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests;
@@ -103,7 +104,8 @@ import org.junit.runners.Suite;
BlockingTxOpsTest.class,
InvokeTest.class,
ExtraColumnInH2RowsTest.class,
- RecoveryModeTest.class
+ RecoveryModeTest.class,
+ ReliableChannelDuplicationTest.class
})
public class ClientTestSuite {
// No-op.