This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f73cd2f0ec5 IGNITE-27180 Java client: fix partition awareness
connection failure handling (#7386)
f73cd2f0ec5 is described below
commit f73cd2f0ec5f18ce0f6d411f4408d9ff80f1daa0
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jan 12 18:06:47 2026 +0200
IGNITE-27180 Java client: fix partition awareness connection failure
handling (#7386)
Fix fallback to round robin if partition-aware channel fails.
---
.../ignite/internal/client/ReliableChannel.java | 16 +--
.../ignite/jdbc/ItJdbcConnectionFailoverTest.java | 1 -
.../app/client/ItThinConnectionFailoverTest.java | 118 +++++++++++++++++++++
3 files changed, 127 insertions(+), 8 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 168b8f819cf..036d34f43c1 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -411,13 +411,15 @@ public final class ReliableChannel implements
AutoCloseable {
ClientChannelHolder holder =
nodeChannelsByName.get(preferredNodeName);
if (holder != null && !holder.close) {
- return holder.getOrCreateChannelAsync().thenCompose(ch -> {
- if (ch != null) {
- return completedFuture(ch);
- } else {
- return getDefaultChannelAsync();
- }
- });
+ return holder.getOrCreateChannelAsync()
+ .handle((ch, err) -> ch) // On error, return null to
fall back to default channel.
+ .thenCompose(ch -> {
+ if (ch != null) {
+ return completedFuture(ch);
+ } else {
+ return getDefaultChannelAsync();
+ }
+ });
}
}
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
index f44d41533e7..f8eac10274f 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
@@ -89,7 +89,6 @@ public class ItJdbcConnectionFailoverTest extends
ClusterPerTestIntegrationTest
* Ensures that the partition aware query is forwarded to the alive node.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-27180")
void testPartitionAwareQueryForwardedToRandomNode() throws SQLException {
int nodesCount = 3;
cluster.startAndInit(nodesCount, new int[]{0, 1, 2});
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinConnectionFailoverTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinConnectionFailoverTest.java
new file mode 100644
index 00000000000..29ad0e6b584
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinConnectionFailoverTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.stream.IntStream;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.KeyValueView;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests thin client connection failover with and without partition awareness.
+ */
+public class ItThinConnectionFailoverTest extends
ClusterPerTestIntegrationTest {
+ private IgniteClient client;
+
+ @Override
+ protected int[] cmgMetastoreNodes() {
+ return new int[]{2};
+ }
+
+ @BeforeEach
+ void setUp() {
+ client = getClient();
+
+ client.sql().executeScript(
+ "CREATE ZONE zone1 (REPLICAS 3) STORAGE PROFILES ['default'];"
+ + "CREATE TABLE t(id INT PRIMARY KEY, val INT) ZONE
zone1");
+
+ Awaitility.await().until(() -> client.connections().size(),
is(initialNodes()));
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Test
+ void testStopNodePartitionAwarenessKeyValue() {
+ KeyValueView<Integer, Integer> kvView = client.tables().table("t")
+ .keyValueView(Integer.class, Integer.class);
+
+ for (int i = 0; i < 10; i++) {
+ kvView.put(null, i, i);
+ }
+
+ cluster.stopNode(0);
+ assertThat(client.connections().size(), is(2));
+
+ for (int i = 10; i < 20; i++) {
+ kvView.put(null, i, i);
+ }
+ }
+
+ @Test
+ void testStopNodePartitionAwarenessQuery() {
+ IgniteSql sql = client.sql();
+
+ for (int i = 0; i < 10; i++) {
+ sql.execute(null, "INSERT INTO t VALUES (?, ?)", i, i).close();
+ }
+
+ cluster.stopNode(0);
+ assertThat(client.connections().size(), is(2));
+
+ for (int i = 10; i < 20; i++) {
+ sql.execute(null, "INSERT INTO t VALUES (?, ?)", i, i).close();
+ }
+ }
+
+ @Test
+ void testStopNodeNonPartitionAwareQuery() {
+ IgniteSql sql = client.sql();
+
+ for (int i = 0; i < 10; i++) {
+ sql.execute(null, "SELECT " + i).close();
+ }
+
+ cluster.stopNode(0);
+ assertThat(client.connections().size(), is(2));
+
+ for (int i = 10; i < 20; i++) {
+ sql.execute(null, "SELECT " + i).close();
+ }
+ }
+
+ private IgniteClient getClient() {
+ String[] addresses = IntStream.range(0, initialNodes())
+ .mapToObj(i -> "127.0.0.1:" + (10800 + i))
+ .toArray(String[]::new);
+
+ return IgniteClient.builder().addresses(addresses).build();
+ }
+}