This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b9049ade572 IGNITE-26115 Create compute task events for thin client 
(#6361)
b9049ade572 is described below

commit b9049ade572c3c21de490f47f60e921ea079e575
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Aug 5 16:03:20 2025 +0300

    IGNITE-26115 Create compute task events for thin client (#6361)
---
 .../ignite/client/handler/ClientContext.java       | 14 +++++--
 .../handler/ClientInboundMessageHandler.java       | 11 +++--
 .../ClientComputeExecuteColocatedRequest.java      | 18 +++++++--
 .../ClientComputeExecutePartitionedRequest.java    | 17 ++++++--
 .../compute/ClientComputeExecuteRequest.java       | 14 +++++--
 .../events/ItThinClientComputeEventsTest.java      | 47 ++++++++++++++++++++++
 6 files changed, 100 insertions(+), 21 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
index 58138654467..20a70f98a27 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.client.handler;
 
+import java.net.SocketAddress;
 import java.util.BitSet;
 import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
@@ -26,7 +27,7 @@ import org.apache.ignite.internal.tostring.S;
 /**
  * Client connection context.
  */
-class ClientContext {
+public class ClientContext {
     /** Version. */
     private final ProtocolVersion version;
 
@@ -38,6 +39,8 @@ class ClientContext {
 
     private final UserDetails userDetails;
 
+    private final SocketAddress remoteAddress;
+
     /**
      * Constructor.
      *
@@ -45,12 +48,14 @@ class ClientContext {
      * @param clientCode Client type code.
      * @param features Feature set.
      * @param userDetails User details.
+     * @param remoteAddress Remote address
      */
-    ClientContext(ProtocolVersion version, int clientCode, BitSet features, 
UserDetails userDetails) {
+    ClientContext(ProtocolVersion version, int clientCode, BitSet features, 
UserDetails userDetails, SocketAddress remoteAddress) {
         this.version = version;
         this.clientCode = clientCode;
         this.features = features;
         this.userDetails = userDetails;
+        this.remoteAddress = remoteAddress;
     }
 
     /**
@@ -100,7 +105,10 @@ class ClientContext {
         return userDetails;
     }
 
-    /** {@inheritDoc} */
+    public SocketAddress remoteAddress() {
+        return remoteAddress;
+    }
+
     @Override
     public String toString() {
         return S.toString(ClientContext.class, this);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 99bbbeb7fb1..89719ac82a5 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.client.handler;
 
-import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.STREAMER_RECEIVER_EXECUTION_OPTIONS;
@@ -505,7 +504,8 @@ public class ClientInboundMessageHandler
             actualFeatures = this.features;
         }
 
-        clientContext = new ClientContext(clientVer, clientCode, 
HandshakeUtils.supportedFeatures(actualFeatures, clientFeatures), user);
+        BitSet supportedFeatures = 
HandshakeUtils.supportedFeatures(actualFeatures, clientFeatures);
+        clientContext = new ClientContext(clientVer, clientCode, 
supportedFeatures, user, ctx.channel().remoteAddress());
 
         sendHandshakeResponse(ctx, packer, actualFeatures);
     }
@@ -897,8 +897,7 @@ public class ClientInboundMessageHandler
                         clientContext.hasFeature(TX_PIGGYBACK));
 
             case ClientOp.COMPUTE_EXECUTE:
-                return ClientComputeExecuteRequest.process(in, compute, 
clusterService, notificationSender(requestId),
-                        clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+                return ClientComputeExecuteRequest.process(in, compute, 
clusterService, notificationSender(requestId), clientContext);
 
             case ClientOp.COMPUTE_EXECUTE_COLOCATED:
                 return ClientComputeExecuteColocatedRequest.process(
@@ -907,7 +906,7 @@ public class ClientInboundMessageHandler
                         igniteTables,
                         clusterService,
                         notificationSender(requestId),
-                        clientContext.hasFeature(PLATFORM_COMPUTE_JOB)
+                        clientContext
                 );
 
             case ClientOp.COMPUTE_EXECUTE_PARTITIONED:
@@ -917,7 +916,7 @@ public class ClientInboundMessageHandler
                         igniteTables,
                         clusterService,
                         notificationSender(requestId),
-                        clientContext.hasFeature(PLATFORM_COMPUTE_JOB)
+                        clientContext
                 );
 
             case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index b600d4e51c8..5d0340e8469 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -21,9 +21,11 @@ import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExe
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
 
 import java.util.BitSet;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientContext;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.compute.JobExecution;
@@ -33,6 +35,8 @@ import 
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Builder;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.table.IgniteTables;
 
@@ -48,7 +52,7 @@ public class ClientComputeExecuteColocatedRequest {
      * @param tables Tables.
      * @param cluster Cluster service
      * @param notificationSender Notification sender.
-     * @param enablePlatformJobs Enable platform jobs.
+     * @param clientContext Client context.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -57,24 +61,30 @@ public class ClientComputeExecuteColocatedRequest {
             IgniteTables tables,
             ClusterService cluster,
             NotificationSender notificationSender,
-            boolean enablePlatformJobs) {
+            ClientContext clientContext
+    ) {
         int tableId = in.unpackInt();
         int schemaId = in.unpackInt();
 
         BitSet noValueSet = in.unpackBitSet();
         byte[] tupleBytes = in.readBinary();
 
-        Job job = ClientComputeJobUnpacker.unpackJob(in, enablePlatformJobs);
+        Job job = ClientComputeJobUnpacker.unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
 
         return readTableAsync(tableId, tables).thenCompose(table -> 
readTuple(schemaId, noValueSet, tupleBytes, table, true)
                 .thenCompose(keyTuple -> {
+                    Builder metadataBuilder = 
ComputeEventMetadata.builder(Type.SINGLE)
+                            .tableName(table.name())
+                            
.initiatorClient(clientContext.remoteAddress().toString())
+                            .eventUser(clientContext.userDetails());
+
                     CompletableFuture<JobExecution<ComputeJobDataHolder>> 
jobExecutionFut = compute.submitColocatedInternal(
                             table,
                             keyTuple,
                             job.deploymentUnits(),
                             job.jobClassName(),
                             job.options(),
-                            ComputeEventMetadata.builder(), // TODO 
IGNITE-26115
+                            metadataBuilder,
                             job.arg(),
                             null
                     );
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
index d99cd89a1a5..4276e272c89 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
@@ -20,8 +20,10 @@ package org.apache.ignite.client.handler.requests.compute;
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.packSubmitResult;
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientContext;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.compute.JobExecution;
@@ -31,6 +33,8 @@ import 
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Builder;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.table.IgniteTables;
 
@@ -46,7 +50,7 @@ public class ClientComputeExecutePartitionedRequest {
      * @param tables Tables.
      * @param cluster Cluster service
      * @param notificationSender Notification sender.
-     * @param enablePlatformJobs Enable platform jobs.
+     * @param clientContext Client context.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -55,21 +59,26 @@ public class ClientComputeExecutePartitionedRequest {
             IgniteTables tables,
             ClusterService cluster,
             NotificationSender notificationSender,
-            boolean enablePlatformJobs
+            ClientContext clientContext
     ) {
         int tableId = in.unpackInt();
         int partitionId = in.unpackInt();
 
-        Job job = ClientComputeJobUnpacker.unpackJob(in, enablePlatformJobs);
+        Job job = ClientComputeJobUnpacker.unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
 
         return readTableAsync(tableId, tables).thenCompose(table -> {
+            Builder metadataBuilder = 
ComputeEventMetadata.builder(Type.BROADCAST)
+                    .tableName(table.name())
+                    .initiatorClient(clientContext.remoteAddress().toString())
+                    .eventUser(clientContext.userDetails());
+
             CompletableFuture<JobExecution<ComputeJobDataHolder>> 
jobExecutionFut = compute.submitPartitionedInternal(
                     table,
                     partitionId,
                     job.deploymentUnits(),
                     job.jobClassName(),
                     job.options(),
-                    ComputeEventMetadata.builder(), // TODO IGNITE-26115
+                    metadataBuilder,
                     job.arg(),
                     null
             );
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 51a333b4dc7..01503ae1033 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.compute;
 
 import static 
org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest.packClusterNode;
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStateRequest.packJobState;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 
 import java.util.HashSet;
@@ -26,6 +27,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import org.apache.ignite.client.handler.ClientContext;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.compute.JobExecution;
@@ -39,6 +41,8 @@ import 
org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Builder;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterService;
@@ -59,7 +63,7 @@ public class ClientComputeExecuteRequest {
      * @param compute Compute.
      * @param cluster Cluster.
      * @param notificationSender Notification sender.
-     * @param enablePlatformJobs Enable platform jobs.
+     * @param clientContext Client context.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -67,12 +71,14 @@ public class ClientComputeExecuteRequest {
             IgniteComputeInternal compute,
             ClusterService cluster,
             NotificationSender notificationSender,
-            boolean enablePlatformJobs
+            ClientContext clientContext
     ) {
         Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster);
-        Job job = ClientComputeJobUnpacker.unpackJob(in, enablePlatformJobs);
+        Job job = ClientComputeJobUnpacker.unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
 
-        ComputeEventMetadata.Builder metadataBuilder = 
ComputeEventMetadata.builder(); // TODO IGNITE-26115
+        Builder metadataBuilder = ComputeEventMetadata.builder(Type.SINGLE)
+                .initiatorClient(clientContext.remoteAddress().toString())
+                .eventUser(clientContext.userDetails());
 
         CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFut = 
compute.executeAsyncWithFailover(
                 candidates, job.deploymentUnits(), job.jobClassName(), 
job.options(), metadataBuilder, job.arg(), null
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
new file mode 100644
index 00000000000..61e2306e7a3
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.events;
+
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.UUID;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.utils.Clients;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.junit.jupiter.api.AfterAll;
+
+class ItThinClientComputeEventsTest extends ItComputeEventsTest {
+    private final Clients clients = new Clients();
+
+    @AfterAll
+    void cleanup() {
+        clients.cleanup();
+    }
+
+    @Override
+    protected IgniteCompute compute() {
+        return clients.compute(node(0));
+    }
+
+    @Override
+    protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType, 
UUID jobId, String jobClassName, String targetNode) {
+        return super.jobEvent(eventType, jobType, jobId, jobClassName, 
targetNode)
+                .withInitiatorClient(notNullValue(String.class));
+    }
+}

Reply via email to