This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new dd1cb74a59b IGNITE-28734 Process control.sh messages in management
pool - Fixes #13192.
dd1cb74a59b is described below
commit dd1cb74a59b20a7c2ee5e1fab2e246d3bde489f7
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Jun 5 11:21:04 2026 +0500
IGNITE-28734 Process control.sh messages in management pool - Fixes #13192.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../testsuites/IgniteControlUtilityTestSuite.java | 4 +-
.../util/GridCommandHandlerManagementPoolTest.java | 94 ++++++++++++++++++++++
.../ClientListenerAbstractConnectionContext.java | 13 ++-
.../odbc/ClientListenerConnectionContext.java | 6 +-
.../processors/odbc/ClientListenerProcessor.java | 45 ++++++++++-
.../odbc/jdbc/JdbcConnectionContext.java | 2 +-
.../odbc/odbc/OdbcConnectionContext.java | 2 +-
.../platform/client/ClientConnectionContext.java | 2 +-
.../util/nio/GridNioAsyncNotifyFilter.java | 2 +-
9 files changed, 158 insertions(+), 12 deletions(-)
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index 0ed39ff19be..1c5701b6ada 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassWithSSLTes
import org.apache.ignite.util.GridCommandHandlerIndexingTest;
import org.apache.ignite.util.GridCommandHandlerIndexingWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerLegacyClientTest;
+import org.apache.ignite.util.GridCommandHandlerManagementPoolTest;
import org.apache.ignite.util.GridCommandHandlerMetadataTest;
import org.apache.ignite.util.GridCommandHandlerSslTest;
import org.apache.ignite.util.GridCommandHandlerTest;
@@ -80,7 +81,8 @@ import org.junit.runners.Suite;
BaselineEventsRemoteTest.class,
GridCommandHandlerWalTest.class,
- GridCommandHandlerCheckpointTest.class
+ GridCommandHandlerCheckpointTest.class,
+ GridCommandHandlerManagementPoolTest.class,
})
public class IgniteControlUtilityTestSuite {
}
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java
new file mode 100644
index 00000000000..cd364a74e8a
--- /dev/null
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerManagementPoolTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.Ignition.startClient;
+import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+
+/**
+ * Tests management pool usage for management tasks.
+ */
+public class GridCommandHandlerManagementPoolTest extends
GridCommandHandlerClusterPerMethodAbstractTest {
+ /** */
+ private static final long TIMEOUT = 10_000L;
+
+ /** */
+ @Test
+ public void testManagementTasksWorksWhenClientPoolBlocked() throws
Exception {
+ Ignite ignite = startGrid(0);
+
+ assertEquals(EXIT_CODE_OK, execute("--set-state",
ClusterState.ACTIVE.name()));
+
+ ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setSqlSchema(QueryUtils.DFLT_SCHEMA)
+ .setSqlFunctionClasses(TestSqlFunctions.class)
+ .setIndexedTypes(Integer.class, String.class)
+ );
+
+ TestSqlFunctions.latch = new CountDownLatch(1);
+
+ try (IgniteClient client = startClient(new
ClientConfiguration().setAddresses("127.0.0.1:10800"))) {
+ // Block client pool by SQL queries.
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+ () -> client.query(new SqlFieldsQuery("SELECT
wait_latch()")).getAll(),
+ ClientConnectorConfiguration.DFLT_THREAD_POOL_SIZE,
"client-thread");
+
+ // Check that management tasks still can be processed.
+ assertEquals(EXIT_CODE_OK, execute("--state")); // Native command.
+ assertEquals(EXIT_CODE_OK, execute("--checkpoint")); // Multi-node
task command.
+
+ TestSqlFunctions.latch.countDown();
+
+ fut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /** */
+ public static class TestSqlFunctions {
+ /** */
+ private static CountDownLatch latch;
+
+ /** */
+ @QuerySqlFunction
+ public static boolean wait_latch() {
+ try {
+ latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignored) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
index 3cdbf56b82e..e4b1f40ca50 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java
@@ -32,6 +32,7 @@ import
org.apache.ignite.plugin.security.AuthenticationContext;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR;
import static
org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT;
/**
@@ -56,6 +57,9 @@ public abstract class ClientListenerAbstractConnectionContext
implements ClientL
/** User attributes. */
protected Map<String, String> userAttrs;
+ /** If client is management. */
+ private volatile Boolean managementClient;
+
/**
* Describes the client connection:
* - thin cli: "cli:host:port@user_name"
@@ -138,11 +142,13 @@ public abstract class
ClientListenerAbstractConnectionContext implements ClientL
}
/** */
- protected void initClientDescriptor(String prefix) {
+ protected void initClientContext(String prefix) {
clientDesc = prefix + ":" + ses.remoteAddress().getHostString() + ":"
+ ses.remoteAddress().getPort();
if (secCtx != null)
clientDesc += "@" + secCtx.subject().login();
+
+ managementClient =
Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR));
}
/**
@@ -196,4 +202,9 @@ public abstract class
ClientListenerAbstractConnectionContext implements ClientL
@Override public Map<String, String> attributes() {
return F.isEmpty(userAttrs) ? Collections.emptyMap() :
Collections.unmodifiableMap(userAttrs);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean managementClient() {
+ return managementClient;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java
index 0848f458b75..88ce50de866 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java
@@ -24,8 +24,6 @@ import
org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR;
-
/**
* SQL listener connection context.
*/
@@ -94,7 +92,5 @@ public interface ClientListenerConnectionContext {
/**
* @return {@code True} if client is management.
*/
- default boolean managementClient() {
- return Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR));
- }
+ boolean managementClient();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
index d092d37a4c1..68af9dfe14a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
@@ -59,6 +59,8 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.util.worker.GridWorkerPool;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
@@ -122,6 +124,9 @@ public class ClientListenerProcessor extends
GridProcessorAdapter {
/** Executor service. */
private ExecutorService execSvc;
+ /** Management pool. */
+ private GridWorkerPool mgmtPool;
+
/** Thin client distributed configuration. */
private DistributedThinClientConfiguration distrThinCfg;
@@ -161,6 +166,7 @@ public class ClientListenerProcessor extends
GridProcessorAdapter {
}
execSvc = ctx.pools().getThinClientExecutorService();
+ mgmtPool = new
GridWorkerPool(ctx.pools().getManagementExecutorService(), log);
Exception lastErr = null;
@@ -435,10 +441,43 @@ public class ClientListenerProcessor extends
GridProcessorAdapter {
else {
connCtx.handler().registerRequest(reqId, cmdType);
- super.onMessageReceived(ses, msg);
+ onMessageReceived(ses, connCtx, msg);
}
}
else
+ onMessageReceived(ses, connCtx, msg);
+ }
+
+ /** */
+ private void onMessageReceived(
+ GridNioSession ses,
+ @Nullable ClientListenerConnectionContext connCtx,
+ Object msg
+ ) throws IgniteCheckedException {
+ if (connCtx == null) {
+ // Process handshake in NIO thread.
+ try {
+ proceedMessageReceived(ses, msg);
+ }
+ catch (IgniteCheckedException e) {
+ handleException(ses, e);
+ }
+ }
+ else if (connCtx.managementClient()) {
+ // Process management messages in management pool.
+ mgmtPool.execute(
+ new GridWorker(ctx.igniteInstanceName(),
"management-message-received-notify", log) {
+ @Override protected void body() {
+ try {
+ proceedMessageReceived(ses, msg);
+ }
+ catch (IgniteCheckedException e) {
+ handleException(ses, e);
+ }
+ }
+ });
+ }
+ else // Process regular messages in client-listener pool.
super.onMessageReceived(ses, msg);
}
};
@@ -488,6 +527,10 @@ public class ClientListenerProcessor extends
GridProcessorAdapter {
execSvc = null;
+ mgmtPool.join(cancel);
+
+ mgmtPool = null;
+
if (!U.IGNITE_MBEANS_DISABLED)
unregisterMBean();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 8750fc71db3..1ecfdd02f00 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -267,7 +267,7 @@ public class JdbcConnectionContext extends
ClientListenerAbstractConnectionConte
protoCtx = new JdbcProtocolContext(ver, features, true);
- initClientDescriptor("jdbc-thin");
+ initClientContext("jdbc-thin");
parser = new JdbcMessageParser(ctx, protoCtx);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index 096eef728ae..626ccf5d63b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -198,7 +198,7 @@ public class OdbcConnectionContext extends
ClientListenerAbstractConnectionConte
}
};
- initClientDescriptor("odbc");
+ initClientContext("odbc");
handler = new OdbcRequestHandler(ctx, busyLock, snd, maxCursors,
distributedJoins, enforceJoinOrder,
replicatedOnly, collocated, skipReducerOnUpdate, qryEngine, ver,
this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index 8f2ca5eb33a..3225d52e53d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -226,7 +226,7 @@ public class ClientConnectionContext extends
ClientListenerAbstractConnectionCon
authenticate(ses, user, pwd);
- initClientDescriptor("cli");
+ initClientContext("cli");
handler = new ClientRequestHandler(this, currentProtocolContext);
parser = new ClientMessageParser(this, currentProtocolContext);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index ccb7f38b912..3fb1ec5dd4a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -139,7 +139,7 @@ public class GridNioAsyncNotifyFilter extends
GridNioFilterAdapter {
* @param ses Session.
* @param ex Exception.
*/
- private void handleException(GridNioSession ses, IgniteCheckedException
ex) {
+ protected void handleException(GridNioSession ses, IgniteCheckedException
ex) {
try {
proceedExceptionCaught(ses, ex);
}