This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b9049ade572 IGNITE-26115 Create compute task events for thin client
(#6361)
b9049ade572 is described below
commit b9049ade572c3c21de490f47f60e921ea079e575
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Aug 5 16:03:20 2025 +0300
IGNITE-26115 Create compute task events for thin client (#6361)
---
.../ignite/client/handler/ClientContext.java | 14 +++++--
.../handler/ClientInboundMessageHandler.java | 11 +++--
.../ClientComputeExecuteColocatedRequest.java | 18 +++++++--
.../ClientComputeExecutePartitionedRequest.java | 17 ++++++--
.../compute/ClientComputeExecuteRequest.java | 14 +++++--
.../events/ItThinClientComputeEventsTest.java | 47 ++++++++++++++++++++++
6 files changed, 100 insertions(+), 21 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
index 58138654467..20a70f98a27 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
@@ -17,6 +17,7 @@
package org.apache.ignite.client.handler;
+import java.net.SocketAddress;
import java.util.BitSet;
import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
@@ -26,7 +27,7 @@ import org.apache.ignite.internal.tostring.S;
/**
* Client connection context.
*/
-class ClientContext {
+public class ClientContext {
/** Version. */
private final ProtocolVersion version;
@@ -38,6 +39,8 @@ class ClientContext {
private final UserDetails userDetails;
+ private final SocketAddress remoteAddress;
+
/**
* Constructor.
*
@@ -45,12 +48,14 @@ class ClientContext {
* @param clientCode Client type code.
* @param features Feature set.
* @param userDetails User details.
+ * @param remoteAddress Remote address
*/
- ClientContext(ProtocolVersion version, int clientCode, BitSet features,
UserDetails userDetails) {
+ ClientContext(ProtocolVersion version, int clientCode, BitSet features,
UserDetails userDetails, SocketAddress remoteAddress) {
this.version = version;
this.clientCode = clientCode;
this.features = features;
this.userDetails = userDetails;
+ this.remoteAddress = remoteAddress;
}
/**
@@ -100,7 +105,10 @@ class ClientContext {
return userDetails;
}
- /** {@inheritDoc} */
+ public SocketAddress remoteAddress() {
+ return remoteAddress;
+ }
+
@Override
public String toString() {
return S.toString(ClientContext.class, this);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 99bbbeb7fb1..89719ac82a5 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler;
-import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.STREAMER_RECEIVER_EXECUTION_OPTIONS;
@@ -505,7 +504,8 @@ public class ClientInboundMessageHandler
actualFeatures = this.features;
}
- clientContext = new ClientContext(clientVer, clientCode,
HandshakeUtils.supportedFeatures(actualFeatures, clientFeatures), user);
+ BitSet supportedFeatures =
HandshakeUtils.supportedFeatures(actualFeatures, clientFeatures);
+ clientContext = new ClientContext(clientVer, clientCode,
supportedFeatures, user, ctx.channel().remoteAddress());
sendHandshakeResponse(ctx, packer, actualFeatures);
}
@@ -897,8 +897,7 @@ public class ClientInboundMessageHandler
clientContext.hasFeature(TX_PIGGYBACK));
case ClientOp.COMPUTE_EXECUTE:
- return ClientComputeExecuteRequest.process(in, compute,
clusterService, notificationSender(requestId),
- clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+ return ClientComputeExecuteRequest.process(in, compute,
clusterService, notificationSender(requestId), clientContext);
case ClientOp.COMPUTE_EXECUTE_COLOCATED:
return ClientComputeExecuteColocatedRequest.process(
@@ -907,7 +906,7 @@ public class ClientInboundMessageHandler
igniteTables,
clusterService,
notificationSender(requestId),
- clientContext.hasFeature(PLATFORM_COMPUTE_JOB)
+ clientContext
);
case ClientOp.COMPUTE_EXECUTE_PARTITIONED:
@@ -917,7 +916,7 @@ public class ClientInboundMessageHandler
igniteTables,
clusterService,
notificationSender(requestId),
- clientContext.hasFeature(PLATFORM_COMPUTE_JOB)
+ clientContext
);
case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index b600d4e51c8..5d0340e8469 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -21,9 +21,11 @@ import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExe
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientContext;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.compute.JobExecution;
@@ -33,6 +35,8 @@ import
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Builder;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.table.IgniteTables;
@@ -48,7 +52,7 @@ public class ClientComputeExecuteColocatedRequest {
* @param tables Tables.
* @param cluster Cluster service
* @param notificationSender Notification sender.
- * @param enablePlatformJobs Enable platform jobs.
+ * @param clientContext Client context.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -57,24 +61,30 @@ public class ClientComputeExecuteColocatedRequest {
IgniteTables tables,
ClusterService cluster,
NotificationSender notificationSender,
- boolean enablePlatformJobs) {
+ ClientContext clientContext
+ ) {
int tableId = in.unpackInt();
int schemaId = in.unpackInt();
BitSet noValueSet = in.unpackBitSet();
byte[] tupleBytes = in.readBinary();
- Job job = ClientComputeJobUnpacker.unpackJob(in, enablePlatformJobs);
+ Job job = ClientComputeJobUnpacker.unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
return readTableAsync(tableId, tables).thenCompose(table ->
readTuple(schemaId, noValueSet, tupleBytes, table, true)
.thenCompose(keyTuple -> {
+ Builder metadataBuilder =
ComputeEventMetadata.builder(Type.SINGLE)
+ .tableName(table.name())
+
.initiatorClient(clientContext.remoteAddress().toString())
+ .eventUser(clientContext.userDetails());
+
CompletableFuture<JobExecution<ComputeJobDataHolder>>
jobExecutionFut = compute.submitColocatedInternal(
table,
keyTuple,
job.deploymentUnits(),
job.jobClassName(),
job.options(),
- ComputeEventMetadata.builder(), // TODO
IGNITE-26115
+ metadataBuilder,
job.arg(),
null
);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
index d99cd89a1a5..4276e272c89 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
@@ -20,8 +20,10 @@ package org.apache.ignite.client.handler.requests.compute;
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.packSubmitResult;
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientContext;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.compute.JobExecution;
@@ -31,6 +33,8 @@ import
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Builder;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.table.IgniteTables;
@@ -46,7 +50,7 @@ public class ClientComputeExecutePartitionedRequest {
* @param tables Tables.
* @param cluster Cluster service
* @param notificationSender Notification sender.
- * @param enablePlatformJobs Enable platform jobs.
+ * @param clientContext Client context.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -55,21 +59,26 @@ public class ClientComputeExecutePartitionedRequest {
IgniteTables tables,
ClusterService cluster,
NotificationSender notificationSender,
- boolean enablePlatformJobs
+ ClientContext clientContext
) {
int tableId = in.unpackInt();
int partitionId = in.unpackInt();
- Job job = ClientComputeJobUnpacker.unpackJob(in, enablePlatformJobs);
+ Job job = ClientComputeJobUnpacker.unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
return readTableAsync(tableId, tables).thenCompose(table -> {
+ Builder metadataBuilder =
ComputeEventMetadata.builder(Type.BROADCAST)
+ .tableName(table.name())
+ .initiatorClient(clientContext.remoteAddress().toString())
+ .eventUser(clientContext.userDetails());
+
CompletableFuture<JobExecution<ComputeJobDataHolder>>
jobExecutionFut = compute.submitPartitionedInternal(
table,
partitionId,
job.deploymentUnits(),
job.jobClassName(),
job.options(),
- ComputeEventMetadata.builder(), // TODO IGNITE-26115
+ metadataBuilder,
job.arg(),
null
);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 51a333b4dc7..01503ae1033 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.compute;
import static
org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest.packClusterNode;
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStateRequest.packJobState;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import java.util.HashSet;
@@ -26,6 +27,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.apache.ignite.client.handler.ClientContext;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.compute.JobExecution;
@@ -39,6 +41,8 @@ import
org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Builder;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
@@ -59,7 +63,7 @@ public class ClientComputeExecuteRequest {
* @param compute Compute.
* @param cluster Cluster.
* @param notificationSender Notification sender.
- * @param enablePlatformJobs Enable platform jobs.
+ * @param clientContext Client context.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -67,12 +71,14 @@ public class ClientComputeExecuteRequest {
IgniteComputeInternal compute,
ClusterService cluster,
NotificationSender notificationSender,
- boolean enablePlatformJobs
+ ClientContext clientContext
) {
Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster);
- Job job = ClientComputeJobUnpacker.unpackJob(in, enablePlatformJobs);
+ Job job = ClientComputeJobUnpacker.unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
- ComputeEventMetadata.Builder metadataBuilder =
ComputeEventMetadata.builder(); // TODO IGNITE-26115
+ Builder metadataBuilder = ComputeEventMetadata.builder(Type.SINGLE)
+ .initiatorClient(clientContext.remoteAddress().toString())
+ .eventUser(clientContext.userDetails());
CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFut =
compute.executeAsyncWithFailover(
candidates, job.deploymentUnits(), job.jobClassName(),
job.options(), metadataBuilder, job.arg(), null
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
new file mode 100644
index 00000000000..61e2306e7a3
--- /dev/null
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.events;
+
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.UUID;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.utils.Clients;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.junit.jupiter.api.AfterAll;
+
+class ItThinClientComputeEventsTest extends ItComputeEventsTest {
+ private final Clients clients = new Clients();
+
+ @AfterAll
+ void cleanup() {
+ clients.cleanup();
+ }
+
+ @Override
+ protected IgniteCompute compute() {
+ return clients.compute(node(0));
+ }
+
+ @Override
+ protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
UUID jobId, String jobClassName, String targetNode) {
+ return super.jobEvent(eventType, jobType, jobId, jobClassName,
targetNode)
+ .withInitiatorClient(notNullValue(String.class));
+ }
+}