This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 794aa0c77ef IGNITE-27019 Update examples+documentation for Partition 
API usage in Compute Jobs (#7897)
794aa0c77ef is described below

commit 794aa0c77ef6fd3cd3327c962aeb4b3564c005b8
Author: jinxxxoid <[email protected]>
AuthorDate: Fri Apr 3 12:55:39 2026 +0400

    IGNITE-27019 Update examples+documentation for Partition API usage in 
Compute Jobs (#7897)
---
 docs/docs/getting-started/migrate-from-ignite-2.md | 117 +++++++++++
 .../example/compute/ComputeBroadcastExample.java   |  68 ++++++
 .../ComputePartitionQueryMapReduceExample.java     | 231 +++++++++++++++++++++
 3 files changed, 416 insertions(+)

diff --git a/docs/docs/getting-started/migrate-from-ignite-2.md 
b/docs/docs/getting-started/migrate-from-ignite-2.md
index d137ac154b6..d2189952e4a 100644
--- a/docs/docs/getting-started/migrate-from-ignite-2.md
+++ b/docs/docs/getting-started/migrate-from-ignite-2.md
@@ -104,3 +104,120 @@ For instructions on configuring metrics, see [Metrics 
Configuration](/3.1.0/conf
 ## Code Migration
 
 Code written for Apache Ignite 2 cannot be directly reused, however as most 
concepts remain similar, code migration should not take too much time.
+
+### Collocated Compute and Partition-Local Queries
+
+In Apache Ignite 2, you could pin a compute job to a specific partition using 
`ComputeTask` with `setPartition` on the job context. Ignite 3 provides two 
approaches to achieve the same result, both based on running a job on the node 
that owns a partition and then querying only that partition's rows using the 
`__PARTITION_ID` virtual SQL column.
+
+#### Option 1: Broadcast to Table Partitions (Recommended)
+
+Use `BroadcastJobTarget.table()` with `JobExecutionContext.partition()`. This 
is the preferred approach because Ignite routes each job instance to the node 
currently holding its partition, and `context.partition()` is always non-null, 
so execution is guaranteed to be local.
+
+```java
+JobDescriptor<Void, Long> job = JobDescriptor.builder(PartitionQueryJob.class)
+        .units(deploymentUnit)
+        .build();
+
+Collection<Long> partitionCounts = client.compute()
+        .execute(BroadcastJobTarget.table("Person"), job, null);
+
+long total = partitionCounts.stream().mapToLong(Long::longValue).sum();
+```
+
+Inside the job, read `context.partition()` to get the partition assigned to 
this instance, then filter rows with `__PARTITION_ID`:
+
+```java
+public class PartitionQueryJob implements ComputeJob<Void, Long> {
+    @Override
+    public CompletableFuture<Long> executeAsync(JobExecutionContext context, 
Void arg) {
+        Partition partition = context.partition(); // non-null with 
BroadcastJobTarget.table()
+
+        long count = 0;
+        try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+                null,
+                "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+                partition.id()
+        )) {
+            if (rs.hasNext()) {
+                count = rs.next().longValue(0);
+            }
+        }
+        return CompletableFuture.completedFuture(count);
+    }
+}
+```
+
+See `ComputeBroadcastExample` in the examples module for a complete runnable 
version.
+
+#### Option 2: MapReduce over Partition Distribution
+
+Use `PartitionDistribution` to enumerate partitions in the split phase of a 
`MapReduceTask`, then dispatch one job per partition to its primary replica 
node:
+
+```java
+public class PersonCountByPartitionTask implements MapReduceTask<Void, Long, 
Long, Long> {
+    @Override
+    public CompletableFuture<List<MapReduceJob<Long, Long>>> splitAsync(
+            TaskExecutionContext context, Void input) {
+        JobDescriptor<Long, Long> jobDescriptor = 
JobDescriptor.builder(PartitionPersonCountJob.class)
+                .build();
+
+        Map<Partition, ClusterNode> primaryReplicas = context.ignite().tables()
+                .table("Person")
+                .partitionDistribution()
+                .primaryReplicas();
+
+        List<MapReduceJob<Long, Long>> jobs = new ArrayList<>();
+        for (Map.Entry<Partition, ClusterNode> entry : 
primaryReplicas.entrySet()) {
+            jobs.add(MapReduceJob.<Long, Long>builder()
+                    .jobDescriptor(jobDescriptor)
+                    .nodes(Set.of(entry.getValue()))
+                    .args(entry.getKey().id())
+                    .build());
+        }
+        return CompletableFuture.completedFuture(jobs);
+    }
+
+    @Override
+    public CompletableFuture<Long> reduceAsync(TaskExecutionContext context, 
Map<UUID, Long> results) {
+        return CompletableFuture.completedFuture(
+                results.values().stream().mapToLong(Long::longValue).sum());
+    }
+}
+```
+
+Each job receives the partition ID as its argument and queries only that 
partition:
+
+```java
+public class PartitionPersonCountJob implements ComputeJob<Long, Long> {
+    @Override
+    public CompletableFuture<Long> executeAsync(JobExecutionContext context, 
Long partitionId) {
+        long count = 0;
+        try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+                null,
+                "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+                partitionId
+        )) {
+            if (rs.hasNext()) {
+                count = rs.next().longValue(0);
+            }
+        }
+        return CompletableFuture.completedFuture(count);
+    }
+}
+```
+
+See `ComputePartitionQueryMapReduceExample` in the examples module for a 
complete runnable version.
+
+:::note
+`PartitionDistribution.primaryReplicas()` captures partition locations at a 
point in time. If a partition is reassigned between the split phase and job 
execution, the job may run on a non-primary node and the SQL query will not be 
local. Use `BroadcastJobTarget.table()` (Option 1) when local execution must be 
guaranteed.
+:::
+
+#### How to Run a Local Query
+
+Both approaches use the `__PARTITION_ID` virtual column (type `BIGINT`) to 
restrict a query to a single partition's rows. This is how you achieve the 
equivalent of Ignite 2's collocated queries without cross-node data movement:
+
+```sql
+SELECT * FROM Person WHERE __PARTITION_ID = ?
+```
+
+Pass the partition ID returned by `partition.id()` (Option 1) or the `long` ID 
passed as the job argument (Option 2) as the query parameter.
diff --git 
a/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
 
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
index dab520bc861..122db47faf0 100644
--- 
a/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
+++ 
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.compute.BroadcastJobTarget.table;
 import static 
org.apache.ignite.example.util.DeployComputeUnit.deployIfNotExist;
 import static org.apache.ignite.example.util.DeployComputeUnit.undeployUnit;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.compute.BroadcastJobTarget;
@@ -30,11 +31,20 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.tx.Transaction;
 
 /**
  * This example demonstrates the usage of the {@link 
IgniteCompute#execute(BroadcastJobTarget, JobDescriptor, Object)} API.
  *
+ * <p>It also shows how to use {@link BroadcastJobTarget#table} to execute a 
partition-aware job on every node holding
+ * a partition of a table. Each job instance reads {@link 
JobExecutionContext#partition()} to discover its partition and
+ * filters rows with the {@code __PARTITION_ID} virtual SQL column, 
guaranteeing local query execution without
+ * cross-node data movement.
+ *
  * <p>See {@code README.md} in the {@code examples} directory for execution 
instructions.</p>
  */
 public class ComputeBroadcastExample {
@@ -123,6 +133,31 @@ public class ComputeBroadcastExample {
                                 .units(new 
DeploymentUnit(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION))
                                 .build(), null
                 );
+
+                
//--------------------------------------------------------------------------------------
+                //
+                // Executing a partition-aware compute job using 
BroadcastJobTarget.table().
+                //
+                // One instance of PartitionQueryJob runs on each node that 
holds a partition of the
+                // Person table. Each instance reads context.partition() to 
determine its partition,
+                // then queries only that partition's rows via the 
__PARTITION_ID virtual SQL column.
+                // This guarantees local execution: no row is fetched from a 
remote node.
+                //
+                // The results (one Long per partition) are collected and 
summed on the client side.
+                //
+                
//--------------------------------------------------------------------------------------
+
+                System.out.println("Executing partition-aware compute job...");
+
+                JobDescriptor<Void, Long> partitionQueryJob = 
JobDescriptor.builder(PartitionQueryJob.class)
+                        .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME, 
DEPLOYMENT_UNIT_VERSION))
+                        .build();
+
+                Collection<Long> partitionCounts = 
client.compute().execute(table("Person"), partitionQueryJob, null);
+
+                long totalPersons = 
partitionCounts.stream().mapToLong(Long::longValue).sum();
+
+                System.out.println("Total person count across all partitions: 
" + totalPersons);
             } finally {
 
                 System.out.println("Cleaning up resources");
@@ -178,4 +213,37 @@ public class ComputeBroadcastExample {
             return completedFuture(null);
         }
     }
+
+    /**
+     * Job that counts persons in a single table partition using the {@code 
__PARTITION_ID} virtual SQL column.
+     *
+     * <p>This job is designed for use with {@link BroadcastJobTarget#table}, 
which routes one instance to each node
+     * that holds a partition of the target table. Each instance calls {@link 
JobExecutionContext#partition()} to
+     * identify its partition and filters rows by {@code __PARTITION_ID}, so 
the SQL query reads only local data.
+     */
+    public static class PartitionQueryJob implements ComputeJob<Void, Long> {
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Long> executeAsync(JobExecutionContext 
context, Void arg) {
+            Partition partition = context.partition();
+
+            assert partition != null : "Partition must be non-null when using 
BroadcastJobTarget.table()";
+
+            long count = 0;
+
+            try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+                    (Transaction) null,
+                    "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+                    partition.id()
+            )) {
+                if (rs.hasNext()) {
+                    count = rs.next().longValue(0);
+                }
+            }
+
+            System.out.println("Partition " + partition.id() + " on node '" + 
context.ignite().name() + "': " + count + " person(s).");
+
+            return completedFuture(count);
+        }
+    }
 }
diff --git 
a/examples/java/src/main/java/org/apache/ignite/example/compute/ComputePartitionQueryMapReduceExample.java
 
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputePartitionQueryMapReduceExample.java
new file mode 100644
index 00000000000..039309b8b36
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputePartitionQueryMapReduceExample.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.example.util.DeployComputeUnit.deployIfNotExist;
+import static org.apache.ignite.example.util.DeployComputeUnit.undeployUnit;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionDistribution;
+
+/**
+ * This example demonstrates a partition-aware Map/Reduce pattern using the 
{@link IgniteCompute#executeMapReduce} API
+ * together with the {@code __PARTITION_ID} virtual SQL column.
+ *
+ * <p>In the split phase, {@link PersonCountByPartitionTask} enumerates all 
partitions of the {@code Person} table
+ * via {@link PartitionDistribution} and dispatches one job per partition to 
the node that currently holds the primary
+ * replica. Each {@link PartitionPersonCountJob} filters rows with {@code 
__PARTITION_ID} and returns the local count.
+ * In the reduce phase the counts are summed.
+ *
+ * <p><b>Note:</b> {@link PartitionDistribution#primaryReplicas()} captures 
partition locations at a point in time.
+ * If a partition is reassigned between the split and job execution, a job may 
run on a non-primary node, making the
+ * SQL query non-local. For guaranteed local execution use {@link 
org.apache.ignite.compute.BroadcastJobTarget#table}
+ * instead — see {@code ComputeBroadcastExample} for that approach.
+ *
+ * <p>See {@code README.md} in the {@code examples} directory for execution 
instructions.
+ */
+public class ComputePartitionQueryMapReduceExample {
+    /** Deployment unit name. */
+    private static final String DEPLOYMENT_UNIT_NAME = "computeExampleUnit";
+
+    /** Deployment unit version. */
+    private static final String DEPLOYMENT_UNIT_VERSION = "1.0.0";
+
+    /**
+     * Main method of the example.
+     *
+     * @param args The command line arguments.
+     * @throws Exception if any error occurs.
+     */
+    public static void main(String[] args) throws Exception {
+
+        
//--------------------------------------------------------------------------------------
+        //
+        // Creating a client to connect to the cluster.
+        //
+        
//--------------------------------------------------------------------------------------
+
+        System.out.println("Connecting to server...");
+
+        try (IgniteClient client = 
IgniteClient.builder().addresses("127.0.0.1:10800").build()) {
+            try {
+
+                
//--------------------------------------------------------------------------------------
+                //
+                // Prerequisites for the example:
+                // 1. Create table and insert sample data.
+                // 2. Deploy the compute unit that contains the job classes.
+                //
+                
//--------------------------------------------------------------------------------------
+
+                client.sql().executeScript(
+                        "CREATE TABLE IF NOT EXISTS Person ("
+                                + "ID INT PRIMARY KEY, FIRST_NAME 
VARCHAR(100),"
+                                + "LAST_NAME VARCHAR(100), AGE INT)"
+                );
+
+                client.sql().executeScript(
+                        "INSERT INTO Person VALUES "
+                                + "(1, 'John', 'Doe', 36),"
+                                + "(2, 'Jane', 'Smith', 35),"
+                                + "(3, 'Robert', 'Johnson', 25)"
+                );
+
+                deployIfNotExist(DEPLOYMENT_UNIT_NAME, 
DEPLOYMENT_UNIT_VERSION);
+
+                
//--------------------------------------------------------------------------------------
+                //
+                // Executing partition-aware map reduce task.
+                //
+                // PersonCountByPartitionTask splits work by enumerating all 
table partitions and
+                // dispatching one PartitionPersonCountJob per partition to 
its primary node.
+                // Each job queries its partition locally using the 
__PARTITION_ID virtual column.
+                // The reduce phase sums the per-partition counts into a 
single total.
+                //
+                
//--------------------------------------------------------------------------------------
+
+                System.out.println("Configuring partition-aware map reduce 
task...");
+
+                TaskDescriptor<Void, Long> taskDescriptor = TaskDescriptor
+                        .builder(PersonCountByPartitionTask.class)
+                        .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME, 
DEPLOYMENT_UNIT_VERSION))
+                        .build();
+
+                System.out.println("Executing partition-aware map reduce 
task...");
+
+                Long totalCount = 
client.compute().executeMapReduce(taskDescriptor, null);
+
+                System.out.println("Total person count across all partitions: 
" + totalCount);
+
+            } finally {
+
+                System.out.println("Cleaning up resources");
+                undeployUnit(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION);
+
+                System.out.println("Dropping the table...");
+                client.sql().executeScript("DROP TABLE IF EXISTS Person");
+            }
+        }
+    }
+
+    /**
+     * MapReduce task that counts persons across all partitions of the {@code 
Person} table.
+     *
+     * <p>The split phase uses {@link PartitionDistribution#primaryReplicas()} 
to get the current primary replica node
+     * for each partition, then creates one {@link PartitionPersonCountJob} 
per partition targeted at that node.
+     * The reduce phase sums the per-partition counts.
+     */
+    public static class PersonCountByPartitionTask implements 
MapReduceTask<Void, Long, Long, Long> {
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<List<MapReduceJob<Long, Long>>> splitAsync(
+                TaskExecutionContext taskContext,
+                Void input) {
+            // Run a SQL query to advance the node's observable timestamp 
tracker to the current
+            // server time. MapReduce jobs are submitted server-side using the 
node's own tracker
+            // (not the client's). Without this step, individual jobs may use 
a read timestamp
+            // that predates inserts committed by the client before the task 
was submitted.
+            try (ResultSet<SqlRow> rs = 
taskContext.ignite().sql().execute("SELECT COUNT(*) FROM Person")) {
+                while (rs.hasNext()) {
+                    rs.next();
+                }
+            }
+
+            JobDescriptor<Long, Long> jobDescriptor = 
JobDescriptor.builder(PartitionPersonCountJob.class)
+                    .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME, 
DEPLOYMENT_UNIT_VERSION))
+                    .build();
+
+            PartitionDistribution distribution = taskContext.ignite().tables()
+                    .table("Person")
+                    .partitionDistribution();
+
+            Map<Partition, ClusterNode> primaryReplicas = 
distribution.primaryReplicas();
+
+            List<MapReduceJob<Long, Long>> jobs = new ArrayList<>();
+
+            for (Map.Entry<Partition, ClusterNode> entry : 
primaryReplicas.entrySet()) {
+                jobs.add(
+                        MapReduceJob.<Long, Long>builder()
+                                .jobDescriptor(jobDescriptor)
+                                .nodes(Set.of(entry.getValue()))
+                                .args(entry.getKey().id())
+                                .build()
+                );
+            }
+
+            return completedFuture(jobs);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Long> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Long> results) {
+            long total = 
results.values().stream().mapToLong(Long::longValue).sum();
+
+            return completedFuture(total);
+        }
+    }
+
+    /**
+     * Job that counts persons in a single partition, identified by partition 
ID passed as the job argument.
+     *
+     * <p>The {@code __PARTITION_ID} virtual SQL column is used to filter rows 
to those belonging to the target
+     * partition. The partition ID is provided by {@link 
PersonCountByPartitionTask} during the split phase.
+     */
+    public static class PartitionPersonCountJob implements ComputeJob<Long, 
Long> {
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Long> executeAsync(JobExecutionContext 
context, Long partitionId) {
+            assert partitionId != null;
+
+            long count = 0;
+
+            try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+                    "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+                    partitionId
+            )) {
+                if (rs.hasNext()) {
+                    count = rs.next().longValue(0);
+                }
+            }
+
+            System.out.println("Partition " + partitionId + " on node '" + 
context.ignite().name() + "': " + count + " person(s).");
+
+            return completedFuture(count);
+        }
+    }
+}

Reply via email to