This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 794aa0c77ef IGNITE-27019 Update examples+documentation for Partition
API usage in Compute Jobs (#7897)
794aa0c77ef is described below
commit 794aa0c77ef6fd3cd3327c962aeb4b3564c005b8
Author: jinxxxoid <[email protected]>
AuthorDate: Fri Apr 3 12:55:39 2026 +0400
IGNITE-27019 Update examples+documentation for Partition API usage in
Compute Jobs (#7897)
---
docs/docs/getting-started/migrate-from-ignite-2.md | 117 +++++++++++
.../example/compute/ComputeBroadcastExample.java | 68 ++++++
.../ComputePartitionQueryMapReduceExample.java | 231 +++++++++++++++++++++
3 files changed, 416 insertions(+)
diff --git a/docs/docs/getting-started/migrate-from-ignite-2.md
b/docs/docs/getting-started/migrate-from-ignite-2.md
index d137ac154b6..d2189952e4a 100644
--- a/docs/docs/getting-started/migrate-from-ignite-2.md
+++ b/docs/docs/getting-started/migrate-from-ignite-2.md
@@ -104,3 +104,120 @@ For instructions on configuring metrics, see [Metrics
Configuration](/3.1.0/conf
## Code Migration
Code written for Apache Ignite 2 cannot be directly reused, however as most
concepts remain similar, code migration should not take too much time.
+
+### Collocated Compute and Partition-Local Queries
+
+In Apache Ignite 2, you could pin a compute job to a specific partition using
`ComputeTask` with `setPartition` on the job context. Ignite 3 provides two
approaches to achieve the same result, both based on running a job on the node
that owns a partition and then querying only that partition's rows using the
`__PARTITION_ID` virtual SQL column.
+
+#### Option 1: Broadcast to Table Partitions (Recommended)
+
+Use `BroadcastJobTarget.table()` with `JobExecutionContext.partition()`. This
is the preferred approach because Ignite routes each job instance to the node
currently holding its partition, and `context.partition()` is always non-null,
so execution is guaranteed to be local.
+
+```java
+JobDescriptor<Void, Long> job = JobDescriptor.builder(PartitionQueryJob.class)
+ .units(deploymentUnit)
+ .build();
+
+Collection<Long> partitionCounts = client.compute()
+ .execute(BroadcastJobTarget.table("Person"), job, null);
+
+long total = partitionCounts.stream().mapToLong(Long::longValue).sum();
+```
+
+Inside the job, read `context.partition()` to get the partition assigned to
this instance, then filter rows with `__PARTITION_ID`:
+
+```java
+public class PartitionQueryJob implements ComputeJob<Void, Long> {
+ @Override
+ public CompletableFuture<Long> executeAsync(JobExecutionContext context,
Void arg) {
+ Partition partition = context.partition(); // non-null with
BroadcastJobTarget.table()
+
+ long count = 0;
+ try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+ null,
+ "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+ partition.id()
+ )) {
+ if (rs.hasNext()) {
+ count = rs.next().longValue(0);
+ }
+ }
+ return CompletableFuture.completedFuture(count);
+ }
+}
+```
+
+See `ComputeBroadcastExample` in the examples module for a complete runnable
version.
+
+#### Option 2: MapReduce over Partition Distribution
+
+Use `PartitionDistribution` to enumerate partitions in the split phase of a
`MapReduceTask`, then dispatch one job per partition to its primary replica
node:
+
+```java
+public class PersonCountByPartitionTask implements MapReduceTask<Void, Long,
Long, Long> {
+ @Override
+ public CompletableFuture<List<MapReduceJob<Long, Long>>> splitAsync(
+ TaskExecutionContext context, Void input) {
+ JobDescriptor<Long, Long> jobDescriptor =
JobDescriptor.builder(PartitionPersonCountJob.class)
+ .build();
+
+ Map<Partition, ClusterNode> primaryReplicas = context.ignite().tables()
+ .table("Person")
+ .partitionDistribution()
+ .primaryReplicas();
+
+ List<MapReduceJob<Long, Long>> jobs = new ArrayList<>();
+ for (Map.Entry<Partition, ClusterNode> entry :
primaryReplicas.entrySet()) {
+ jobs.add(MapReduceJob.<Long, Long>builder()
+ .jobDescriptor(jobDescriptor)
+ .nodes(Set.of(entry.getValue()))
+ .args(entry.getKey().id())
+ .build());
+ }
+ return CompletableFuture.completedFuture(jobs);
+ }
+
+ @Override
+ public CompletableFuture<Long> reduceAsync(TaskExecutionContext context,
Map<UUID, Long> results) {
+ return CompletableFuture.completedFuture(
+ results.values().stream().mapToLong(Long::longValue).sum());
+ }
+}
+```
+
+Each job receives the partition ID as its argument and queries only that
partition:
+
+```java
+public class PartitionPersonCountJob implements ComputeJob<Long, Long> {
+ @Override
+ public CompletableFuture<Long> executeAsync(JobExecutionContext context,
Long partitionId) {
+ long count = 0;
+ try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+ null,
+ "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+ partitionId
+ )) {
+ if (rs.hasNext()) {
+ count = rs.next().longValue(0);
+ }
+ }
+ return CompletableFuture.completedFuture(count);
+ }
+}
+```
+
+See `ComputePartitionQueryMapReduceExample` in the examples module for a
complete runnable version.
+
+:::note
+`PartitionDistribution.primaryReplicas()` captures partition locations at a
point in time. If a partition is reassigned between the split phase and job
execution, the job may run on a non-primary node and the SQL query will not be
local. Use `BroadcastJobTarget.table()` (Option 1) when local execution must be
guaranteed.
+:::
+
+#### How to Run a Local Query
+
+Both approaches use the `__PARTITION_ID` virtual column (type `BIGINT`) to
restrict a query to a single partition's rows. This is how you achieve the
equivalent of Ignite 2's collocated queries without cross-node data movement:
+
+```sql
+SELECT * FROM Person WHERE __PARTITION_ID = ?
+```
+
+Pass the partition ID returned by `partition.id()` (Option 1) or the `long` ID
passed as the job argument (Option 2) as the query parameter.
diff --git
a/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
index dab520bc861..122db47faf0 100644
---
a/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
+++
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.compute.BroadcastJobTarget.table;
import static
org.apache.ignite.example.util.DeployComputeUnit.deployIfNotExist;
import static org.apache.ignite.example.util.DeployComputeUnit.undeployUnit;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.compute.BroadcastJobTarget;
@@ -30,11 +31,20 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.tx.Transaction;
/**
* This example demonstrates the usage of the {@link
IgniteCompute#execute(BroadcastJobTarget, JobDescriptor, Object)} API.
*
+ * <p>It also shows how to use {@link BroadcastJobTarget#table} to execute a
partition-aware job on every node holding
+ * a partition of a table. Each job instance reads {@link
JobExecutionContext#partition()} to discover its partition and
+ * filters rows with the {@code __PARTITION_ID} virtual SQL column,
guaranteeing local query execution without
+ * cross-node data movement.
+ *
* <p>See {@code README.md} in the {@code examples} directory for execution
instructions.</p>
*/
public class ComputeBroadcastExample {
@@ -123,6 +133,31 @@ public class ComputeBroadcastExample {
.units(new
DeploymentUnit(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION))
.build(), null
);
+
+
//--------------------------------------------------------------------------------------
+ //
+ // Executing a partition-aware compute job using
BroadcastJobTarget.table().
+ //
+ // One instance of PartitionQueryJob runs on each node that
holds a partition of the
+ // Person table. Each instance reads context.partition() to
determine its partition,
+ // then queries only that partition's rows via the
__PARTITION_ID virtual SQL column.
+ // This guarantees local execution: no row is fetched from a
remote node.
+ //
+ // The results (one Long per partition) are collected and
summed on the client side.
+ //
+
//--------------------------------------------------------------------------------------
+
+ System.out.println("Executing partition-aware compute job...");
+
+ JobDescriptor<Void, Long> partitionQueryJob =
JobDescriptor.builder(PartitionQueryJob.class)
+ .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME,
DEPLOYMENT_UNIT_VERSION))
+ .build();
+
+ Collection<Long> partitionCounts =
client.compute().execute(table("Person"), partitionQueryJob, null);
+
+ long totalPersons =
partitionCounts.stream().mapToLong(Long::longValue).sum();
+
+ System.out.println("Total person count across all partitions:
" + totalPersons);
} finally {
System.out.println("Cleaning up resources");
@@ -178,4 +213,37 @@ public class ComputeBroadcastExample {
return completedFuture(null);
}
}
+
+ /**
+ * Job that counts persons in a single table partition using the {@code
__PARTITION_ID} virtual SQL column.
+ *
+ * <p>This job is designed for use with {@link BroadcastJobTarget#table},
which routes one instance to each node
+ * that holds a partition of the target table. Each instance calls {@link
JobExecutionContext#partition()} to
+ * identify its partition and filters rows by {@code __PARTITION_ID}, so
the SQL query reads only local data.
+ */
+ public static class PartitionQueryJob implements ComputeJob<Void, Long> {
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Long> executeAsync(JobExecutionContext
context, Void arg) {
+ Partition partition = context.partition();
+
+ assert partition != null : "Partition must be non-null when using
BroadcastJobTarget.table()";
+
+ long count = 0;
+
+ try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+ (Transaction) null,
+ "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+ partition.id()
+ )) {
+ if (rs.hasNext()) {
+ count = rs.next().longValue(0);
+ }
+ }
+
+ System.out.println("Partition " + partition.id() + " on node '" +
context.ignite().name() + "': " + count + " person(s).");
+
+ return completedFuture(count);
+ }
+ }
}
diff --git
a/examples/java/src/main/java/org/apache/ignite/example/compute/ComputePartitionQueryMapReduceExample.java
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputePartitionQueryMapReduceExample.java
new file mode 100644
index 00000000000..039309b8b36
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/ignite/example/compute/ComputePartitionQueryMapReduceExample.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.example.util.DeployComputeUnit.deployIfNotExist;
+import static org.apache.ignite.example.util.DeployComputeUnit.undeployUnit;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionDistribution;
+
+/**
+ * This example demonstrates a partition-aware Map/Reduce pattern using the
{@link IgniteCompute#executeMapReduce} API
+ * together with the {@code __PARTITION_ID} virtual SQL column.
+ *
+ * <p>In the split phase, {@link PersonCountByPartitionTask} enumerates all
partitions of the {@code Person} table
+ * via {@link PartitionDistribution} and dispatches one job per partition to
the node that currently holds the primary
+ * replica. Each {@link PartitionPersonCountJob} filters rows with {@code
__PARTITION_ID} and returns the local count.
+ * In the reduce phase the counts are summed.
+ *
+ * <p><b>Note:</b> {@link PartitionDistribution#primaryReplicas()} captures
partition locations at a point in time.
+ * If a partition is reassigned between the split and job execution, a job may
run on a non-primary node, making the
+ * SQL query non-local. For guaranteed local execution use {@link
org.apache.ignite.compute.BroadcastJobTarget#table}
+ * instead — see {@code ComputeBroadcastExample} for that approach.
+ *
+ * <p>See {@code README.md} in the {@code examples} directory for execution
instructions.
+ */
+public class ComputePartitionQueryMapReduceExample {
+ /** Deployment unit name. */
+ private static final String DEPLOYMENT_UNIT_NAME = "computeExampleUnit";
+
+ /** Deployment unit version. */
+ private static final String DEPLOYMENT_UNIT_VERSION = "1.0.0";
+
+ /**
+ * Main method of the example.
+ *
+ * @param args The command line arguments.
+ * @throws Exception if any error occurs.
+ */
+ public static void main(String[] args) throws Exception {
+
+
//--------------------------------------------------------------------------------------
+ //
+ // Creating a client to connect to the cluster.
+ //
+
//--------------------------------------------------------------------------------------
+
+ System.out.println("Connecting to server...");
+
+ try (IgniteClient client =
IgniteClient.builder().addresses("127.0.0.1:10800").build()) {
+ try {
+
+
//--------------------------------------------------------------------------------------
+ //
+ // Prerequisites for the example:
+ // 1. Create table and insert sample data.
+ // 2. Deploy the compute unit that contains the job classes.
+ //
+
//--------------------------------------------------------------------------------------
+
+ client.sql().executeScript(
+ "CREATE TABLE IF NOT EXISTS Person ("
+ + "ID INT PRIMARY KEY, FIRST_NAME
VARCHAR(100),"
+ + "LAST_NAME VARCHAR(100), AGE INT)"
+ );
+
+ client.sql().executeScript(
+ "INSERT INTO Person VALUES "
+ + "(1, 'John', 'Doe', 36),"
+ + "(2, 'Jane', 'Smith', 35),"
+ + "(3, 'Robert', 'Johnson', 25)"
+ );
+
+ deployIfNotExist(DEPLOYMENT_UNIT_NAME,
DEPLOYMENT_UNIT_VERSION);
+
+
//--------------------------------------------------------------------------------------
+ //
+ // Executing partition-aware map reduce task.
+ //
+ // PersonCountByPartitionTask splits work by enumerating all
table partitions and
+ // dispatching one PartitionPersonCountJob per partition to
its primary node.
+ // Each job queries its partition locally using the
__PARTITION_ID virtual column.
+ // The reduce phase sums the per-partition counts into a
single total.
+ //
+
//--------------------------------------------------------------------------------------
+
+ System.out.println("Configuring partition-aware map reduce
task...");
+
+ TaskDescriptor<Void, Long> taskDescriptor = TaskDescriptor
+ .builder(PersonCountByPartitionTask.class)
+ .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME,
DEPLOYMENT_UNIT_VERSION))
+ .build();
+
+ System.out.println("Executing partition-aware map reduce
task...");
+
+ Long totalCount =
client.compute().executeMapReduce(taskDescriptor, null);
+
+ System.out.println("Total person count across all partitions:
" + totalCount);
+
+ } finally {
+
+ System.out.println("Cleaning up resources");
+ undeployUnit(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION);
+
+ System.out.println("Dropping the table...");
+ client.sql().executeScript("DROP TABLE IF EXISTS Person");
+ }
+ }
+ }
+
+ /**
+ * MapReduce task that counts persons across all partitions of the {@code
Person} table.
+ *
+ * <p>The split phase uses {@link PartitionDistribution#primaryReplicas()}
to get the current primary replica node
+ * for each partition, then creates one {@link PartitionPersonCountJob}
per partition targeted at that node.
+ * The reduce phase sums the per-partition counts.
+ */
+ public static class PersonCountByPartitionTask implements
MapReduceTask<Void, Long, Long, Long> {
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<List<MapReduceJob<Long, Long>>> splitAsync(
+ TaskExecutionContext taskContext,
+ Void input) {
+ // Run a SQL query to advance the node's observable timestamp
tracker to the current
+ // server time. MapReduce jobs are submitted server-side using the
node's own tracker
+ // (not the client's). Without this step, individual jobs may use
a read timestamp
+ // that predates inserts committed by the client before the task
was submitted.
+ try (ResultSet<SqlRow> rs =
taskContext.ignite().sql().execute("SELECT COUNT(*) FROM Person")) {
+ while (rs.hasNext()) {
+ rs.next();
+ }
+ }
+
+ JobDescriptor<Long, Long> jobDescriptor =
JobDescriptor.builder(PartitionPersonCountJob.class)
+ .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME,
DEPLOYMENT_UNIT_VERSION))
+ .build();
+
+ PartitionDistribution distribution = taskContext.ignite().tables()
+ .table("Person")
+ .partitionDistribution();
+
+ Map<Partition, ClusterNode> primaryReplicas =
distribution.primaryReplicas();
+
+ List<MapReduceJob<Long, Long>> jobs = new ArrayList<>();
+
+ for (Map.Entry<Partition, ClusterNode> entry :
primaryReplicas.entrySet()) {
+ jobs.add(
+ MapReduceJob.<Long, Long>builder()
+ .jobDescriptor(jobDescriptor)
+ .nodes(Set.of(entry.getValue()))
+ .args(entry.getKey().id())
+ .build()
+ );
+ }
+
+ return completedFuture(jobs);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Long> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, Long> results) {
+ long total =
results.values().stream().mapToLong(Long::longValue).sum();
+
+ return completedFuture(total);
+ }
+ }
+
+ /**
+ * Job that counts persons in a single partition, identified by partition
ID passed as the job argument.
+ *
+ * <p>The {@code __PARTITION_ID} virtual SQL column is used to filter rows
to those belonging to the target
+ * partition. The partition ID is provided by {@link
PersonCountByPartitionTask} during the split phase.
+ */
+ public static class PartitionPersonCountJob implements ComputeJob<Long,
Long> {
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Long> executeAsync(JobExecutionContext
context, Long partitionId) {
+ assert partitionId != null;
+
+ long count = 0;
+
+ try (ResultSet<SqlRow> rs = context.ignite().sql().execute(
+ "SELECT COUNT(*) FROM Person WHERE __PARTITION_ID = ?",
+ partitionId
+ )) {
+ if (rs.hasNext()) {
+ count = rs.next().longValue(0);
+ }
+ }
+
+ System.out.println("Partition " + partitionId + " on node '" +
context.ignite().name() + "': " + count + " person(s).");
+
+ return completedFuture(count);
+ }
+ }
+}