This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 17804b5b74 IGNITE-17690 Java thin: Get and verify cluster id on
connection (#1297)
17804b5b74 is described below
commit 17804b5b741f037842fb5a9dc71e927ef2fbf02a
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Nov 1 23:20:19 2022 +0300
IGNITE-17690 Java thin: Get and verify cluster id on connection (#1297)
* Get and cache cluster id when starting `ClientHandlerModule`.
* Include cluster id in handshake response.
* Store cluster id from the first handshake in `ReliableChannel`, drop
connection when cluster id does not match.
---
.../ignite/client/handler/ItClientHandlerTest.java | 8 +-
.../ignite/client/handler/ClientHandlerModule.java | 18 +++-
.../handler/ClientInboundMessageHandler.java | 11 +-
.../ignite/internal/client/ProtocolContext.java | 17 ++-
.../ignite/internal/client/ReliableChannel.java | 31 +++++-
.../ignite/internal/client/TcpClientChannel.java | 4 +-
.../apache/ignite/client/AbstractClientTest.java | 5 +-
.../apache/ignite/client/ClientComputeTest.java | 9 +-
.../apache/ignite/client/ClientLoggingTest.java | 97 +----------------
.../org/apache/ignite/client/MultiClusterTest.java | 115 +++++++++++++++++++++
.../org/apache/ignite/client/RetryPolicyTest.java | 3 +-
.../ignite/client/TestClientHandlerModule.java | 22 ++--
.../apache/ignite/client/TestLoggerFactory.java | 110 ++++++++++++++++++++
.../java/org/apache/ignite/client/TestServer.java | 14 ++-
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../internal/testframework/IgniteTestUtils.java | 12 +--
.../cpp/ignite/client/detail/node_connection.cpp | 1 +
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 3 +
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 1 +
.../dotnet/Apache.Ignite/Internal/Table/Tables.cs | 3 +-
.../platforms/dotnet/Apache.Ignite/Table/ITable.cs | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
22 files changed, 361 insertions(+), 131 deletions(-)
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 2c722d4508..7acbaf5246 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -35,6 +35,8 @@ import java.io.OutputStream;
import java.net.Socket;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -133,6 +135,7 @@ public class ItClientHandlerTest {
final var idleTimeout = unpacker.unpackLong();
final var nodeId = unpacker.unpackString();
final var nodeName = unpacker.unpackString();
+ unpacker.skipValue(); // Cluster id.
var featuresLen = unpacker.unpackBinaryHeader();
unpacker.skipValue(featuresLen);
@@ -141,7 +144,7 @@ public class ItClientHandlerTest {
unpacker.skipValue(extensionsLen);
assertArrayEquals(MAGIC, magic);
- assertEquals(26, len);
+ assertEquals(44, len);
assertEquals(3, major);
assertEquals(0, minor);
assertEquals(0, patch);
@@ -230,7 +233,8 @@ public class ItClientHandlerTest {
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id");
var module = new ClientHandlerModule(mock(QueryProcessor.class),
mock(IgniteTablesInternal.class), mock(IgniteTransactions.class),
- registry, mock(IgniteCompute.class), clusterService,
bootstrapFactory, mock(IgniteSql.class));
+ registry, mock(IgniteCompute.class), clusterService,
bootstrapFactory, mock(IgniteSql.class),
+ () -> CompletableFuture.completedFuture(UUID.randomUUID()));
module.start();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index ba5ed4fc15..4c1c17fd1e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -29,7 +29,10 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.BindException;
import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
@@ -65,6 +68,12 @@ public class ClientHandlerModule implements IgniteComponent {
/** Ignite SQL API. */
private final IgniteSql sql;
+ /** Cluster ID supplier. */
+ private final Supplier<CompletableFuture<UUID>> clusterIdSupplier;
+
+ /** Cluster ID. */
+ private UUID clusterId;
+
/** Netty channel. */
private volatile Channel channel;
@@ -99,7 +108,8 @@ public class ClientHandlerModule implements IgniteComponent {
IgniteCompute igniteCompute,
ClusterService clusterService,
NettyBootstrapFactory bootstrapFactory,
- IgniteSql sql) {
+ IgniteSql sql,
+ Supplier<CompletableFuture<UUID>> clusterIdSupplier) {
assert igniteTables != null;
assert registry != null;
assert queryProcessor != null;
@@ -107,6 +117,7 @@ public class ClientHandlerModule implements IgniteComponent
{
assert clusterService != null;
assert bootstrapFactory != null;
assert sql != null;
+ assert clusterIdSupplier != null;
this.queryProcessor = queryProcessor;
this.igniteTables = igniteTables;
@@ -116,6 +127,7 @@ public class ClientHandlerModule implements IgniteComponent
{
this.registry = registry;
this.bootstrapFactory = bootstrapFactory;
this.sql = sql;
+ this.clusterIdSupplier = clusterIdSupplier;
}
/** {@inheritDoc} */
@@ -127,6 +139,7 @@ public class ClientHandlerModule implements IgniteComponent
{
try {
channel = startEndpoint().channel();
+ clusterId = clusterIdSupplier.get().join();
} catch (InterruptedException e) {
throw new IgniteException(e);
}
@@ -194,7 +207,8 @@ public class ClientHandlerModule implements IgniteComponent
{
configuration,
igniteCompute,
clusterService,
- sql));
+ sql,
+ clusterId));
}
})
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
configuration.connectTimeout());
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 4ad001df01..f24b460d29 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -128,6 +128,9 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
/** SQL query cursor handler. */
private final JdbcQueryCursorHandler jdbcQueryCursorHandler;
+ /** Cluster ID. */
+ private final UUID clusterId;
+
/** Context. */
private ClientContext clientContext;
@@ -143,6 +146,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
* @param configuration Configuration.
* @param compute Compute.
* @param clusterService Cluster.
+ * @param sql SQL.
+ * @param clusterId Cluster ID.
*/
public ClientInboundMessageHandler(
IgniteTablesInternal igniteTables,
@@ -151,7 +156,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
ClientConnectorView configuration,
IgniteCompute compute,
ClusterService clusterService,
- IgniteSql sql) {
+ IgniteSql sql,
+ UUID clusterId) {
assert igniteTables != null;
assert igniteTransactions != null;
assert processor != null;
@@ -159,6 +165,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
assert compute != null;
assert clusterService != null;
assert sql != null;
+ assert clusterId != null;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
@@ -166,6 +173,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
this.compute = compute;
this.clusterService = clusterService;
this.sql = sql;
+ this.clusterId = clusterId;
jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(processor, new
JdbcMetadataCatalog(igniteTables), resources);
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
@@ -228,6 +236,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
ClusterNode localMember =
clusterService.topologyService().localMember();
packer.packString(localMember.id());
packer.packString(localMember.name());
+ packer.packUuid(clusterId);
packer.packBinaryHeader(0); // Features.
packer.packMapHeader(0); // Extensions.
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
index bb86925de4..27af5c3e82 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
+import java.util.UUID;
import
org.apache.ignite.client.IgniteClientFeatureNotSupportedByServerException;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.network.ClusterNode;
@@ -40,6 +41,9 @@ public class ProtocolContext {
/** Cluster node. */
private final ClusterNode clusterNode;
+ /** Cluster id. */
+ private final UUID clusterId;
+
/**
* Constructor.
*
@@ -52,11 +56,13 @@ public class ProtocolContext {
ProtocolVersion ver,
EnumSet<ProtocolBitmaskFeature> features,
long serverIdleTimeout,
- ClusterNode clusterNode) {
+ ClusterNode clusterNode,
+ UUID clusterId) {
this.ver = ver;
this.features = Collections.unmodifiableSet(features != null ?
features : EnumSet.noneOf(ProtocolBitmaskFeature.class));
this.serverIdleTimeout = serverIdleTimeout;
this.clusterNode = clusterNode;
+ this.clusterId = clusterId;
}
/**
@@ -116,4 +122,13 @@ public class ProtocolContext {
public ClusterNode clusterNode() {
return clusterNode;
}
+
+ /**
+ * Returns cluster id.
+ *
+ * @return Cluster id.
+ */
+ public UUID clusterId() {
+ return clusterId;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index ca74a4cfde..082502c5dd 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client;
+import static
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.UNKNOWN_ERR;
@@ -31,12 +32,14 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
@@ -100,6 +103,9 @@ public final class ReliableChannel implements AutoCloseable
{
* the table will compare its version with channel version to detect an
update. */
private final AtomicLong assignmentVersion = new AtomicLong();
+ /** Cluster id from the first handshake. */
+ private final AtomicReference<UUID> clusterId = new AtomicReference<>();
+
/**
* Constructor.
*
@@ -595,6 +601,10 @@ public final class ReliableChannel implements
AutoCloseable {
/** Determines whether specified operation should be retried. */
private boolean shouldRetry(ClientOperationType opType, int iteration,
IgniteClientConnectionException exception,
IgniteClientConnectionException
aggregateException) {
+ if (exception.code() == CLUSTER_ID_MISMATCH_ERR) {
+ return false;
+ }
+
if (opType == null) {
// System operation.
return iteration < RetryLimitPolicy.DFLT_RETRY_LIMIT;
@@ -631,8 +641,8 @@ public final class ReliableChannel implements AutoCloseable
{
try {
hld.getOrCreateChannel(true);
- } catch (Exception ignore) {
- // No-op.
+ } catch (Exception e) {
+ log.warn("Failed to establish connection to " +
hld.chCfg.getAddress() + ": " + e.getMessage(), e);
}
}
}
@@ -741,7 +751,22 @@ public final class ReliableChannel implements
AutoCloseable {
throw new
IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due
to applied throttling");
}
- ch = chFactory.apply(chCfg, connMgr);
+ ClientChannel ch0 = chFactory.apply(chCfg, connMgr);
+
+ var oldClusterId = clusterId.compareAndExchange(null,
ch0.protocolContext().clusterId());
+
+ if (oldClusterId != null &&
!oldClusterId.equals(ch0.protocolContext().clusterId())) {
+ try {
+ ch0.close();
+ } catch (Exception ignored) {
+ // Ignore
+ }
+
+ throw new
IgniteClientConnectionException(CLUSTER_ID_MISMATCH_ERR, "Cluster ID mismatch:
expected=" + oldClusterId
+ + ", actual=" +
ch0.protocolContext().clusterId());
+ }
+
+ ch = ch0;
ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index fca061c674..4ca527d49b 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -424,6 +424,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
var clusterNodeName = unpacker.unpackString();
var addr = sock.remoteAddress();
var clusterNode = new ClusterNode(clusterNodeId, clusterNodeName,
new NetworkAddress(addr.getHostName(), addr.getPort()));
+ var clusterId = unpacker.unpackUuid();
var featuresLen = unpacker.unpackBinaryHeader();
unpacker.skipValues(featuresLen);
@@ -431,7 +432,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
var extensionsLen = unpacker.unpackMapHeader();
unpacker.skipValues(extensionsLen);
- protocolCtx = new ProtocolContext(srvVer,
ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout, clusterNode);
+ protocolCtx = new ProtocolContext(
+ srvVer, ProtocolBitmaskFeature.allFeaturesAsEnumSet(),
serverIdleTimeout, clusterNode, clusterId);
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index c711d0c697..422e56993b 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import io.netty.util.ResourceLeakDetector;
+import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
@@ -43,6 +44,8 @@ public abstract class AbstractClientTest {
protected static int serverPort;
+ protected static UUID clusterId = UUID.randomUUID();
+
/**
* Before all.
*/
@@ -133,7 +136,7 @@ public abstract class AbstractClientTest {
Ignite ignite,
String nodeName
) {
- return new TestServer(port, portRange, idleTimeout, ignite, null,
nodeName);
+ return new TestServer(port, portRange, idleTimeout, ignite, null,
nodeName, clusterId);
}
/**
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 509a603513..22c23493fa 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -173,9 +174,11 @@ public class ClientComputeTest {
ignite = new FakeIgnite();
((FakeIgniteTables) ignite.tables()).createTable(TABLE_NAME);
- server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection,
"s1");
- server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection,
"s2");
- server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection,
"s3");
+ var clusterId = UUID.randomUUID();
+
+ server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection,
"s1", clusterId);
+ server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection,
"s2", clusterId);
+ server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection,
"s3", clusterId);
}
private Set<ClusterNode> getClusterNodes(String... names) {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
index b3c3424a54..71e557b836 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
@@ -23,11 +23,6 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.lang.System.Logger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ResourceBundle;
-import java.util.function.Supplier;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -40,10 +35,10 @@ import org.junit.jupiter.api.Test;
*/
public class ClientLoggingTest {
/** Test server. */
- TestServer server;
+ private TestServer server;
/** Test server 2. */
- TestServer server2;
+ private TestServer server2;
@AfterEach
void tearDown() throws Exception {
@@ -66,9 +61,6 @@ public class ClientLoggingTest {
assertEquals("t", client1.tables().tables().get(0).name());
assertEquals("t", client2.tables().tables().get(0).name());
- assertThat(loggerFactory1.logger.entries(), empty());
- assertThat(loggerFactory2.logger.entries(), empty());
-
server.close();
FakeIgnite ignite2 = new FakeIgnite();
@@ -86,7 +78,7 @@ public class ClientLoggingTest {
loggerFactory2.logger.entries().forEach(msg -> assertThat(msg,
startsWith("client2:")));
}
- private TestServer startServer(int port, FakeIgnite ignite) {
+ private static TestServer startServer(int port, FakeIgnite ignite) {
return AbstractClientTest.startServer(
port,
10,
@@ -95,91 +87,10 @@ public class ClientLoggingTest {
);
}
- private IgniteClient createClient(LoggerFactory loggerFactory) {
+ private static IgniteClient createClient(LoggerFactory loggerFactory) {
return IgniteClient.builder()
.addresses("127.0.0.1:10950..10960")
.loggerFactory(loggerFactory)
.build();
}
-
- private static class TestLoggerFactory implements LoggerFactory {
- private final SimpleCapturingLogger logger;
-
- public TestLoggerFactory(String factoryName) {
- this.logger = new SimpleCapturingLogger(factoryName);
- }
-
- @Override
- public Logger forName(String name) {
- return logger;
- }
- }
-
- private static class SimpleCapturingLogger implements System.Logger {
- private final String name;
-
- private final List<String> logEntries = new ArrayList<>();
-
- public SimpleCapturingLogger(String name) {
- this.name = name;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public boolean isLoggable(Level level) {
- return true;
- }
-
- @Override
- public void log(Level level, String msg) {
- captureLog(msg);
- }
-
- @Override
- public void log(Level level, Supplier<String> msgSupplier) {
- throw new AssertionError("Should not be called");
- }
-
- @Override
- public void log(Level level, Object obj) {
- throw new AssertionError("Should not be called");
- }
-
- @Override
- public void log(Level level, String msg, Throwable thrown) {
- captureLog(msg);
- }
-
- @Override
- public void log(Level level, Supplier<String> msgSupplier, Throwable
thrown) {
- throw new AssertionError("Should not be called");
- }
-
- @Override
- public void log(Level level, String format, Object... params) {
- throw new AssertionError("Should not be called");
- }
-
- @Override
- public void log(Level level, ResourceBundle bundle, String msg,
Throwable thrown) {
- throw new AssertionError("Should not be called");
- }
-
- @Override
- public void log(Level level, ResourceBundle bundle, String format,
Object... params) {
- throw new AssertionError("Should not be called");
- }
-
- public List<String> entries() {
- return logEntries;
- }
-
- private void captureLog(String msg) {
- logEntries.add(name + ":" + msg);
- }
- }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
new file mode 100644
index 0000000000..9af2164593
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.hamcrest.CoreMatchers;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests client behavior with multiple clusters.
+ */
+public class MultiClusterTest {
+ private static final UUID clusterId1 = UUID.randomUUID();
+
+ private static final UUID clusterId2 = UUID.randomUUID();
+
+ private TestServer server1;
+
+ private TestServer server2;
+
+ @BeforeEach
+ void setUp() {
+ server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s1",
clusterId1);
+ server2 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s2",
clusterId2);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(server1, server2);
+ }
+
+ @Test
+ public void testClientDropsConnectionOnClusterIdMismatch()
+ throws Exception {
+ TestLoggerFactory loggerFactory = new TestLoggerFactory("client");
+
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + server1.port(), "127.0.0.1:" +
server2.port())
+ .loggerFactory(loggerFactory);
+
+ try (var client = builder.build()) {
+ assertTrue(IgniteTestUtils.waitForCondition(() ->
getFailedConnectionEntry(loggerFactory) != null, 3000));
+
+ assertEquals(1, client.connections().size());
+
+ String err = getFailedConnectionEntry(loggerFactory);
+ String expectedErr = "Cluster ID mismatch: expected=" + clusterId2
+ ", actual=" + clusterId1;
+
+ assertThat(err, CoreMatchers.containsString(expectedErr));
+ }
+ }
+
+ @Test
+ public void testReconnectToDifferentClusterFails()
+ throws Exception {
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + server1.port());
+
+ server2.close();
+
+ try (var client = builder.build()) {
+ client.tables().tables();
+
+ server1.close();
+ server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null,
"s1", clusterId2);
+
+ IgniteClientConnectionException ex =
(IgniteClientConnectionException) assertThrowsWithCause(
+ () -> client.tables().tables(),
IgniteClientConnectionException.class, "Cluster ID mismatch");
+
+ IgniteClientConnectionException cause =
(IgniteClientConnectionException) ExceptionUtils.getSuppressedList(ex).stream()
+ .filter(e -> e.getCause().getMessage().contains("Cluster
ID mismatch"))
+ .findFirst()
+ .orElseThrow()
+ .getCause();
+
+ assertEquals(CLUSTER_ID_MISMATCH_ERR, cause.code());
+ }
+ }
+
+ private static @Nullable String getFailedConnectionEntry(TestLoggerFactory
loggerFactory) {
+ return loggerFactory.logger.entries().stream()
+ .filter(x -> x.contains("Failed to establish connection to"))
+ .findFirst()
+ .orElse(null);
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index c38381e410..35b7ee1297 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
+import java.util.UUID;
import java.util.function.Function;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
@@ -264,6 +265,6 @@ public class RetryPolicyTest {
FakeIgnite ign = new FakeIgnite();
((FakeIgniteTables) ign.tables()).createTable("t");
- server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null);
+ server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null,
UUID.randomUUID());
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index 4a67b26a41..bc23b933bc 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelOption;
import io.netty.util.ReferenceCounted;
import java.net.BindException;
import java.net.SocketAddress;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.ignite.Ignite;
@@ -65,6 +66,9 @@ public class TestClientHandlerModule implements
IgniteComponent {
/** Compute. */
private final IgniteCompute compute;
+ /** Cluster id. */
+ private final UUID clusterId;
+
/** Netty channel. */
private volatile Channel channel;
@@ -74,12 +78,13 @@ public class TestClientHandlerModule implements
IgniteComponent {
/**
* Constructor.
*
- * @param ignite Ignite.
- * @param registry Configuration registry.
- * @param bootstrapFactory Bootstrap factory.
+ * @param ignite Ignite.
+ * @param registry Configuration registry.
+ * @param bootstrapFactory Bootstrap factory.
* @param shouldDropConnection Connection drop condition.
- * @param clusterService Cluster service.
- * @param compute Compute.
+ * @param clusterService Cluster service.
+ * @param compute Compute.
+ * @param clusterId Cluster id.
*/
public TestClientHandlerModule(
Ignite ignite,
@@ -87,7 +92,8 @@ public class TestClientHandlerModule implements
IgniteComponent {
NettyBootstrapFactory bootstrapFactory,
Function<Integer, Boolean> shouldDropConnection,
ClusterService clusterService,
- IgniteCompute compute) {
+ IgniteCompute compute,
+ UUID clusterId) {
assert ignite != null;
assert registry != null;
assert bootstrapFactory != null;
@@ -98,6 +104,7 @@ public class TestClientHandlerModule implements
IgniteComponent {
this.shouldDropConnection = shouldDropConnection;
this.clusterService = clusterService;
this.compute = compute;
+ this.clusterId = clusterId;
}
/** {@inheritDoc} */
@@ -166,7 +173,8 @@ public class TestClientHandlerModule implements
IgniteComponent {
configuration,
compute,
clusterService,
- mock(IgniteSql.class)));
+ mock(IgniteSql.class),
+ clusterId));
}
})
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
configuration.connectTimeout());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestLoggerFactory.java
b/modules/client/src/test/java/org/apache/ignite/client/TestLoggerFactory.java
new file mode 100644
index 0000000000..35e8f630f5
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestLoggerFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.lang.System.Logger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ResourceBundle;
+import java.util.function.Supplier;
+import org.apache.ignite.lang.LoggerFactory;
+
+/**
+ * Logger factory for tests.
+ */
+public class TestLoggerFactory implements LoggerFactory {
+ public final ListLogger logger;
+
+ TestLoggerFactory(String factoryName) {
+ this.logger = new ListLogger(factoryName);
+ }
+
+ @Override
+ public Logger forName(String name) {
+ return logger;
+ }
+
+ /** Logger that stores all messages in a list. */
+ public static class ListLogger implements System.Logger {
+ private final String name;
+
+ private final List<String> logEntries = new ArrayList<>();
+
+ ListLogger(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean isLoggable(Level level) {
+ return true;
+ }
+
+ @Override
+ public void log(Level level, String msg) {
+ captureLog(msg);
+ }
+
+ @Override
+ public void log(Level level, Supplier<String> msgSupplier) {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public void log(Level level, Object obj) {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public void log(Level level, String msg, Throwable thrown) {
+ captureLog(msg);
+ }
+
+ @Override
+ public void log(Level level, Supplier<String> msgSupplier, Throwable
thrown) {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public void log(Level level, String format, Object... params) {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public void log(Level level, ResourceBundle bundle, String msg,
Throwable thrown) {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public void log(Level level, ResourceBundle bundle, String format,
Object... params) {
+ throw new AssertionError("Should not be called");
+ }
+
+ public List<String> entries() {
+ return logEntries;
+ }
+
+ private void captureLog(String msg) {
+ logEntries.add(name + ":" + msg);
+ }
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 69ed82d52f..df1e9d8045 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -28,6 +28,7 @@ import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.Ignite;
@@ -44,6 +45,7 @@ import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
import org.mockito.Mockito;
/**
@@ -72,7 +74,7 @@ public class TestServer implements AutoCloseable {
long idleTimeout,
Ignite ignite
) {
- this(port, portRange, idleTimeout, ignite, null, null);
+ this(port, portRange, idleTimeout, ignite, null, null,
UUID.randomUUID());
}
/**
@@ -88,8 +90,9 @@ public class TestServer implements AutoCloseable {
int portRange,
long idleTimeout,
Ignite ignite,
- Function<Integer, Boolean> shouldDropConnection,
- String nodeName
+ @Nullable Function<Integer, Boolean> shouldDropConnection,
+ String nodeName,
+ UUID clusterId
) {
cfg = new ConfigurationRegistry(
List.of(ClientConnectorConfiguration.KEY,
NetworkConfiguration.KEY),
@@ -128,7 +131,7 @@ public class TestServer implements AutoCloseable {
compute.executeColocated(anyString(), any(), anyString(),
any())).thenReturn(CompletableFuture.completedFuture(nodeName));
module = shouldDropConnection != null
- ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory,
shouldDropConnection, clusterService, compute)
+ ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory,
shouldDropConnection, clusterService, compute, clusterId)
: new ClientHandlerModule(
((FakeIgnite) ignite).queryEngine(),
(IgniteTablesInternal) ignite.tables(),
@@ -137,7 +140,8 @@ public class TestServer implements AutoCloseable {
compute,
clusterService,
bootstrapFactory,
- ignite.sql()
+ ignite.sql(),
+ () -> CompletableFuture.completedFuture(clusterId)
);
module.start();
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 0edbbd3b69..09bfff12cb 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -87,6 +87,9 @@ public class ErrorGroups {
/** Configuration error. */
public static final int CONFIGURATION_ERR =
CLIENT_ERR_GROUP.registerErrorCode(7);
+
+ /** Cluster ID mismatch error. */
+ public static final int CLUSTER_ID_MISMATCH_ERR =
CLIENT_ERR_GROUP.registerErrorCode(8);
}
/** SQL error group. */
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 329f9161c8..048252264b 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -308,15 +308,13 @@ public final class IgniteTestUtils {
) {
for (Throwable th = t; th != null; th = th.getCause()) {
if (cls.isAssignableFrom(th.getClass())) {
- if (msg != null) {
- if (th.getMessage() != null &&
th.getMessage().contains(msg)) {
- return true;
- } else {
- continue;
- }
+ if (msg == null) {
+ return true;
}
- return true;
+ if (th.getMessage() != null && th.getMessage().contains(msg)) {
+ return true;
+ }
}
for (Throwable n : th.getSuppressed()) {
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index 696de96d53..1145a574e3 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -127,6 +127,7 @@ ignite_result<void>
node_connection::process_handshake_rsp(bytes_view msg) {
(void)reader.read_string_nullable(); // Cluster node ID. Needed for
partition-aware compute.
(void)reader.read_string_nullable(); // Cluster node name. Needed for
partition-aware compute.
+ reader.skip(); // TODO: IGNITE-18053 Get and verify cluster id on
connection
reader.skip(); // Features.
reader.skip(); // Extensions.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index a70fa7d54f..db4e262f66 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -85,6 +85,8 @@ namespace Apache.Ignite.Tests
public IClusterNode Node { get; }
+ public Guid ClusterId { get; set; }
+
public string[] PartitionAssignment { get; set; }
public bool PartitionAssignmentChanged { get; set; }
@@ -379,6 +381,7 @@ namespace Apache.Ignite.Tests
handshakeWriter.Write(0); // Idle timeout.
handshakeWriter.Write(Node.Id); // Node id.
handshakeWriter.Write(Node.Name); // Node name (consistent id).
+ handshakeWriter.Write(ClusterId);
handshakeWriter.WriteBinHeader(0); // Features.
handshakeWriter.WriteMapHeader(0); // Extensions.
handshakeWriter.Flush();
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index c880553004..680cfc6902 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -299,6 +299,7 @@ namespace Apache.Ignite.Internal
var clusterNodeId = reader.ReadString();
var clusterNodeName = reader.ReadString();
+ reader.Skip(); // Cluster ID. TODO IGNITE-18046.
reader.Skip(); // Features.
reader.Skip(); // Extensions.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
index 12e9582494..7bd499f82b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
@@ -69,10 +69,9 @@ namespace Apache.Ignite.Internal.Table
var id = r.ReadGuid();
var name = r.ReadString();
- // ReSharper disable once LambdaExpressionMustBeStatic
(not supported by .NET Core 3.1, TODO IGNITE-16994)
var table = _tables.GetOrAdd(
id,
- (Guid id0, (string Name, ClientFailoverSocket Socket)
arg) => new Table(arg.Name, id0, arg.Socket),
+ static (Guid id0, (string Name, ClientFailoverSocket
Socket) arg) => new Table(arg.Name, id0, arg.Socket),
(name, _socket));
res.Add(table);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
index 1ee9d3ce67..f27127a042 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
@@ -45,7 +45,7 @@ namespace Apache.Ignite.Table
/// </summary>
/// <typeparam name="T">Record type.</typeparam>
/// <returns>Record view.</returns>
- public IRecordView<T> GetRecordView<T>() // TODO: Custom mapping
(IGNITE-16356)
+ public IRecordView<T> GetRecordView<T>()
where T : notnull;
/// <summary>
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 30b71afaa1..5408387745 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -416,7 +416,8 @@ public class IgniteImpl implements Ignite {
compute,
clusterSvc,
nettyBootstrapFactory,
- sql
+ sql,
+ () -> cmgMgr.clusterState().thenApply(s ->
s.clusterTag().clusterId())
);
}