Pochatkin commented on code in PR #7122:
URL: https://github.com/apache/ignite-3/pull/7122#discussion_r2812300004
##########
examples/java/src/main/java/org/apache/ignite/example/code/deployment/AbstractDeploymentUnitExample.java:
##########
@@ -0,0 +1,52 @@
+package org.apache.ignite.example.code.deployment;
Review Comment:
Missing ASF license header. All source files in the project must include the
Apache License header (this was already mentioned by @iartiukhov but appears to
still be missing).
##########
examples/java/src/main/java/org/apache/ignite/example/code/deployment/AbstractDeploymentUnitExample.java:
##########
@@ -0,0 +1,52 @@
+package org.apache.ignite.example.code.deployment;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.apache.ignite.example.util.DeployComputeUnit;
+
+public class AbstractDeploymentUnitExample {
Review Comment:
Using inheritance purely for sharing static utility methods is an
anti-pattern. All examples extend this class only to access `jarPath` and
`processDeploymentUnit()`, but these could simply be static methods/fields in
`DeployComputeUnit` (which already exists as a utility class). This would
remove the need for the entire class hierarchy.
Suggestion: move `processDeploymentUnit()` and the JAR path logic into
`DeployComputeUnit`, then delete this class and remove `extends
AbstractDeploymentUnitExample` from all examples.
##########
examples/java/src/main/java/org/apache/ignite/example/compute/ComputeAsyncExample.java:
##########
@@ -135,13 +188,21 @@ public static void main(String[] args) throws
ExecutionException, InterruptedExc
//--------------------------------------------------------------------------------------
System.out.println("\nTotal number of characters in the words is
'" + sum + "'.");
+ } catch (Exception e) {
Review Comment:
This pattern is repeated in every single example and is redundant:
```java
} catch (Exception e) {
throw new RuntimeException(e);
}
```
Since `main` already declares `throws Exception`, the catch block is
unnecessary — it only wraps a checked exception into an unchecked one for no
benefit. The `finally` block alone would suffice.
This applies to all modified examples.
##########
examples/java/src/main/java/org/apache/ignite/example/code/deployment/CodeDeploymentExample.java:
##########
@@ -53,6 +119,11 @@ public static void main(String[] args) {
String result = client.compute().execute(target, job, "Hello from
job");
System.out.println("\n=== Result ===\n" + result);
+ }catch (Exception e) {
Review Comment:
Missing space before `catch`:
```java
}catch (Exception e) {
```
Should be:
```java
} catch (Exception e) {
```
##########
examples/java/src/main/java/org/apache/ignite/example/util/DeployComputeUnit.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example.util;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for deploying Ignite compute units.
+ * <p>
+ * Note: The deployment unit JAR is now built at compile time via the
deploymentUnitJar Gradle task,
+ * not at runtime. This eliminates the need for runtime JAR building.
+ * </p>
+ */
+public class DeployComputeUnit {
+
+ private static final String BASE_URL = "http://localhost:10300";
+ private static final HttpClient HTTP = HttpClient.newHttpClient();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int DEPLOYMENT_TIMEOUT_SECONDS = 30;
+
+
+ /**
+ * Checks if a deployment unit already exists on the cluster with DEPLOYED
status.
+ *
+ * @param unitId Deployment unit ID.
+ * @param version Deployment version.
+ * @return True if deployment exists with DEPLOYED status.
+ * @throws Exception If request fails.
+ */
+ private static boolean isDeployed(String unitId, String version) throws
Exception {
+ HttpRequest req = HttpRequest.newBuilder()
+ .uri(new URI(BASE_URL +
"/management/v1/deployment/cluster/units/" + unitId))
+ .GET()
+ .build();
+
+ HttpResponse<String> resp = HTTP.send(req,
HttpResponse.BodyHandlers.ofString());
+
+ System.out.println("[DEBUG] Checking deployment status for " + unitId
+ " version " + version);
Review Comment:
There are ~15 `System.out.println("[DEBUG]")` statements throughout
`isDeployed()` (lines 66-68, 72, 83-84, 88, 99, 101, 108, 111, 118). These are
clearly leftover debug prints and should be removed before merging. If logging
is needed, consider using a proper logger or at least a consistent non-debug
format.
##########
examples/java/src/main/java/org/apache/ignite/example/code/deployment/AbstractDeploymentUnitExample.java:
##########
@@ -0,0 +1,52 @@
+package org.apache.ignite.example.code.deployment;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.apache.ignite.example.util.DeployComputeUnit;
+
+public class AbstractDeploymentUnitExample {
+
+ // Root path of the project
+ private static final Path projectRoot = Paths.get("").toAbsolutePath();
+
+ // Pre-built JAR from deploymentUnitJar task (built at compile time)
+ private static final Path DEFAULT_JAR_PATH =
+
projectRoot.resolve("examples/java/build/libs/deploymentunit-example-1.0.0.jar");
+
+ protected static String jarPathAsString = "";
Review Comment:
Non-final `protected static` mutable fields exposed via inheritance are
fragile. Any subclass can modify them, and the state is shared across all
examples. Consider encapsulating these in a return object from
`processDeploymentUnit()` or making them private with accessor methods.
##########
examples/java/src/main/java/org/apache/ignite/example/util/DeployComputeUnit.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example.util;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for deploying Ignite compute units.
+ * <p>
+ * Note: The deployment unit JAR is now built at compile time via the
deploymentUnitJar Gradle task,
+ * not at runtime. This eliminates the need for runtime JAR building.
+ * </p>
+ */
+public class DeployComputeUnit {
+
+ private static final String BASE_URL = "http://localhost:10300";
+ private static final HttpClient HTTP = HttpClient.newHttpClient();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int DEPLOYMENT_TIMEOUT_SECONDS = 30;
+
+
+ /**
+ * Checks if a deployment unit already exists on the cluster with DEPLOYED
status.
+ *
+ * @param unitId Deployment unit ID.
+ * @param version Deployment version.
+ * @return True if deployment exists with DEPLOYED status.
+ * @throws Exception If request fails.
+ */
+ private static boolean isDeployed(String unitId, String version) throws
Exception {
+ HttpRequest req = HttpRequest.newBuilder()
+ .uri(new URI(BASE_URL +
"/management/v1/deployment/cluster/units/" + unitId))
+ .GET()
+ .build();
+
+ HttpResponse<String> resp = HTTP.send(req,
HttpResponse.BodyHandlers.ofString());
+
+ System.out.println("[DEBUG] Checking deployment status for " + unitId
+ " version " + version);
+ System.out.println("[DEBUG] HTTP Status: " + resp.statusCode());
+ System.out.println("[DEBUG] Response body: " + resp.body());
+
+ if (resp.statusCode() == 404) {
+ // Unit doesn't exist yet
+ System.out.println("[DEBUG] Unit not found (404)");
+ return false;
+ }
+
+ if (resp.statusCode() != 200) {
+ throw new RuntimeException("Failed to check deployment status.
HTTP " + resp.statusCode() + ": " + resp.body());
+ }
+
+ // Parse JSON response - the API returns a Collection<UnitStatus>
+ JsonNode root = OBJECT_MAPPER.readTree(resp.body());
+
+ System.out.println("[DEBUG] Parsed JSON root: " + root);
+ System.out.println("[DEBUG] Is array: " + root.isArray() + ", Is
object: " + root.isObject());
+
+ // Handle empty response (unit exists but no matching status)
+ if (root.isArray() && root.isEmpty()) {
+ System.out.println("[DEBUG] Empty array response");
+ return false;
+ }
+
+ // The response is an array of UnitStatus objects
+ if (!root.isArray()) {
+ throw new RuntimeException("Unexpected response format. Expected
array, got: " + root);
+ }
+
+ // Check if any node has this version deployed
+ for (JsonNode unitStatus : root) {
+ System.out.println("[DEBUG] Processing UnitStatus: " + unitStatus);
+ JsonNode versionToStatus = unitStatus.path("versionToStatus");
+ System.out.println("[DEBUG] versionToStatus: " + versionToStatus);
+
+ if (versionToStatus.isArray()) {
+ for (JsonNode versionStatus : versionToStatus) {
+ String versionValue =
versionStatus.path("version").asText();
+ String statusValue = versionStatus.path("status").asText();
+
+ System.out.println("[DEBUG] Found version: " +
versionValue + ", status: " + statusValue);
+
+ if (version.equals(versionValue) &&
"DEPLOYED".equals(statusValue)) {
+ System.out.println("[DEBUG] MATCH FOUND - Deployment
is ready!");
+ return true;
+ }
+ }
+ }
+ }
+
+ System.out.println("[DEBUG] No matching DEPLOYED status found");
+ return false;
+ }
+
+ /**
+ * Checks if a deployment unit exists (in any state).
+ *
+ * @param unitId Deployment unit ID.
+ * @param version Deployment version.
+ * @return True if deployment exists.
+ * @throws Exception If request fails.
+ */
+ private static boolean deploymentExists(String unitId, String version)
throws Exception {
+ HttpRequest req = HttpRequest.newBuilder()
+ .uri(new URI(BASE_URL +
"/management/v1/deployment/cluster/units/" + unitId))
+ .GET()
+ .build();
+
+ HttpResponse<String> resp = HTTP.send(req,
HttpResponse.BodyHandlers.ofString());
+ return resp.statusCode() == 200 &&
resp.body().contains("\"version\":\"" + version + "\"");
+ }
+
+ /**
+ * Deploys a unit to the Ignite cluster and waits for it to reach DEPLOYED
status.
+ *
+ * @param unitId Deployment unit ID.
+ * @param version Deployment version.
+ * @param jar Path to the JAR file to upload.
+ * @throws Exception If deployment fails.
+ */
+ private static void deployUnit(String unitId, String version, Path jar)
throws Exception {
+ String boundary = "igniteBoundary";
+
+ byte[] jarBytes = Files.readAllBytes(jar);
+
+ String start =
+ "--" + boundary + "\r\n" +
+ "Content-Disposition: form-data; name=\"unitContent\";
filename=\"" + jar.getFileName() + "\"\r\n" +
+ "Content-Type: application/java-archive\r\n\r\n";
+
+ String end = "\r\n--" + boundary + "--\r\n";
+
+ byte[] startBytes = start.getBytes();
+ byte[] endBytes = end.getBytes();
+
+ byte[] full = new byte[startBytes.length + jarBytes.length +
endBytes.length];
+
+ System.arraycopy(startBytes, 0, full, 0, startBytes.length);
+ System.arraycopy(jarBytes, 0, full, startBytes.length,
jarBytes.length);
+ System.arraycopy(endBytes, 0, full, startBytes.length +
jarBytes.length, endBytes.length);
+
+ HttpRequest req = HttpRequest.newBuilder()
+ .uri(new URI(BASE_URL + "/management/v1/deployment/units/" +
unitId + "/" + version + "?deployMode=ALL"))
+ .header("Content-Type", "multipart/form-data; boundary=" +
boundary)
+ .POST(HttpRequest.BodyPublishers.ofByteArray(full))
+ .build();
+
+ HttpResponse<String> resp = HTTP.send(req,
HttpResponse.BodyHandlers.ofString());
+
+ if (resp.statusCode() != 200 && resp.statusCode() != 409) {
+ throw new RuntimeException("Deployment failed: " +
resp.statusCode() + "\n" + resp.body());
+ }
+
+ // Wait for deployment to reach DEPLOYED status using polling
+ await()
+ .atMost(DEPLOYMENT_TIMEOUT_SECONDS, SECONDS)
+ .pollInterval(200, MILLISECONDS)
+ .until(() -> isDeployed(unitId, version));
+ }
+
+ /**
+ * Undeploys the given deployment unit from the cluster and waits for it
to be removed.
+ *
+ * @param unitId Deployment unit ID.
+ * @param version Deployment version.
+ * @throws Exception If undeployment fails.
+ */
+ public static void undeployUnit(String unitId, String version) throws
Exception {
+ HttpRequest req = HttpRequest.newBuilder()
+ .uri(new URI(BASE_URL + "/management/v1/deployment/units/" +
unitId + "/" + version))
+ .DELETE()
+ .build();
+
+ HttpResponse<String> resp = HTTP.send(req,
HttpResponse.BodyHandlers.ofString());
+
+ if (resp.statusCode() != 200 && resp.statusCode() != 404) {
+ throw new RuntimeException("Undeploy failed: " + resp.statusCode()
+ "\n" + resp.body());
+ }
+
+ // Wait for deployment to be removed using polling
+ await()
+ .atMost(DEPLOYMENT_TIMEOUT_SECONDS, SECONDS)
+ .pollInterval(200, MILLISECONDS)
+ .until(() -> !deploymentExists(unitId, version));
+
+ System.out.println("Unit successfully undeployed.");
+ }
+
+ /**
+ * Processes command-line arguments for:
+ * <ul>
+ * <li><b>runFromIDE</b> – whether the example runs from source</li>
+ * <li><b>jarPath</b> – path to external JAR when run outside IDE</li>
+ * </ul>
+ *
+ * @param args Command-line arguments (may be null).
+ * @return Map with keys "runFromIDE" and "jarPath".
+ */
+ public static Map<String, Object> processArguments(String[] args) {
+ Map<String, Object> response = new HashMap<>();
+
+ if (args == null) {
+ return response;
+ }
+
+ boolean runFromIDE = true;
+ String jarPath = null;
+
+ for (String arg : args) {
+
+ if (arg.contains("runFromIDE")) {
+ String[] splitArgArr = arg.split("=");
+ if (splitArgArr != null && splitArgArr.length == 2) {
Review Comment:
`splitArgArr != null` check is redundant — `String.split()` never returns
`null`.
Also, `processArguments` returns `Map<String, Object>` which requires unsafe
casts at the call site (`(boolean) p.get("runFromIDE")`). Consider returning a
typed record/class instead:
```java
record DeploymentArgs(boolean runFromIDE, String jarPath) {}
```
##########
examples/java/src/main/java/org/apache/ignite/example/streaming/DistributedComputeWithReceiverExample.java:
##########
@@ -30,98 +35,189 @@
import org.apache.ignite.catalog.ColumnType;
import org.apache.ignite.catalog.definitions.TableDefinition;
import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.example.code.deployment.AbstractDeploymentUnitExample;
import org.apache.ignite.table.DataStreamerReceiver;
import org.apache.ignite.table.DataStreamerReceiverContext;
import org.apache.ignite.table.DataStreamerReceiverDescriptor;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
-/** This example demonstrates how to use the streaming API to simulate a fraud
detection process,
- * which typically involves intensive processing of each transaction using ML
models.
+/**
+ * This example demonstrates how to use the streaming API to simulate a fraud
detection process, which typically involves intensive
+ * processing of each transaction using ML models.
+ *
+ * <p>Find instructions on how to run the example in the {@code README.md}
+ * file located in the {@code examples} directory root.</p>
+ *
+ * <h2>Execution Modes</h2>
+ *
+ * <p>There are two modes of execution:</p>
+ *
+ * <h3>1. Automated : The JAR Deployment for deployment unit is automated
</h3>
+ *
+ * <h4>1.1 With IDE</h4>
+ * <ul>
+ * <li>
+ * <b>Run from an IDE</b><br>
+ * Launch the example directly from the IDE. If the required deployment
+ * unit is not present, the example automatically builds and deploys the
+ * necessary JAR.
+ * </li>
+ * </ul>
+ *
+ * <h3>1.2 Without IDE</h3>
+ * <ul>
+ * <li>
+ * <b>Run from the command line</b><br>
+ * Start the example using a Java command where the classpath includes
+ * all required dependencies:
+ *
+ * <pre>{@code
+ * java -cp
"{user.home}\\.m2\\repository\\org\\apache\\ignite\\ignite-core\\3.1.0-SNAPSHOT\\
+ * ignite-core-3.1.0-SNAPSHOT.jar{other required jars}"
+ * <example-main-class> runFromIDE=false jarPath="{path-to-examples-jar}"
+ * }</pre>
+ *
+ * In this mode, {@code runFromIDE=false} indicates command-line execution,
+ * and {@code jarPath} must reference the examples JAR used as the
+ * deployment unit.
+ * </li>
+ * </ul>
+ *
+ * <h2>2. Manual (with IDE): The JAR deployment for the deployment unit is
manual</h2>
+ *
+ * <p>Before running this example, complete the following steps related to
+ * code deployment:</p>
+ *
+ * <ol>
+ * <li>
+ * Build the {@code ignite-examples-x.y.z.jar} file:<br>
+ * {@code ./gradlew :ignite-examples:jar}
+ * </li>
+ * <li>
+ * Deploy the generated JAR as a deployment unit using the CLI:<br>
+ * <pre>{@code
+ * cluster unit deploy computeExampleUnit \
+ * --version 1.0.0 \
+ * --path=$IGNITE_HOME/examples/build/libs/ignite-examples-x.y.z.jar
+ * }</pre>
+ * </li>
+ * </ol>
*/
-public class DistributedComputeWithReceiverExample {
- public static void main(String[] arg) {
+public class DistributedComputeWithReceiverExample extends
AbstractDeploymentUnitExample {
+
+ private static final String DEPLOYMENT_UNIT_NAME =
"streamerReceiverExampleUnit";
+
+ /** Deployment unit version. */
+ private static final String DEPLOYMENT_UNIT_VERSION = "1.0.0";
+
+ public static void main(String[] arg) throws Exception {
+
+ processDeploymentUnit(arg);
try (IgniteClient client = IgniteClient.builder()
.addresses("127.0.0.1:10800")
.build()) {
- /* Source data is a list of financial transactions */
- /* We distribute this processing across the cluster, then gather and
return results */
- List<Tuple> sourceData = IntStream.range(1, 10)
- .mapToObj(i -> Tuple.create()
- .set("txId", i)
- .set("txData", "{some-json-data}"))
- .collect(Collectors.toList());
+ deployIfNotExist(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION,
jarPath);
- DataStreamerReceiverDescriptor<Tuple, Void, Tuple> desc =
DataStreamerReceiverDescriptor
- .builder(FraudDetectorReceiver.class)
- .build();
+ /* Source data is a list of financial transactions */
+ /* We distribute this processing across the cluster, then gather
and return results */
+ List<Tuple> sourceData = IntStream.range(1, 10)
+ .mapToObj(i -> Tuple.create()
+ .set("txId", i)
+ .set("txData", "{some-json-data}"))
+ .collect(Collectors.toList());
- CompletableFuture<Void> streamerFut;
+ DataStreamerReceiverDescriptor<Tuple, Void, Tuple> desc =
DataStreamerReceiverDescriptor
+ .builder(FraudDetectorReceiver.class)
+ .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME,
DEPLOYMENT_UNIT_VERSION))
+ .build();
+
+ CompletableFuture<Void> streamerFut;
/* Streaming requires a target table to partition data.
/* Use a dummy table for this scenario, because we are not going to
store any data */
- TableDefinition txDummyTableDef = TableDefinition.builder("tx_dummy")
- .columns(column("id", ColumnType.INTEGER))
- .primaryKey("id")
- .build();
+ TableDefinition txDummyTableDef =
TableDefinition.builder("tx_dummy")
+ .columns(column("id", ColumnType.INTEGER))
+ .primaryKey("id")
+ .build();
+
+ Table dummyTable = client.catalog().createTable(txDummyTableDef);
- Table dummyTable = client.catalog().createTable(txDummyTableDef);
- /* Source data has "txId" field, but target dummy table has "id"
column, so keyFunc maps "txId" to "id" */
- Function<Tuple, Tuple> keyFunc = sourceItem ->
Tuple.create().set("id", sourceItem.value("txId"));
+ /* Source data has "txId" field, but target dummy table has "id"
column, so keyFunc maps "txId" to "id" */
+ Function<Tuple, Tuple> keyFunc = sourceItem ->
Tuple.create().set("id", sourceItem.value("txId"));
/* Payload function is used to extract the payload (data that goes to
the receiver) from the source item.
/* In our case, we want to use the whole source item as the payload */
- Function<Tuple, Tuple> payloadFunc = Function.identity();
-
- Flow.Subscriber<Tuple> resultSubscriber = new Flow.Subscriber<>() {
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
+ Function<Tuple, Tuple> payloadFunc = Function.identity();
+
+ Flow.Subscriber<Tuple> resultSubscriber = new Flow.Subscriber<>() {
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Tuple item) {
+ System.out.println("Transaction processed: " + item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.err.println("Error during streaming: " +
throwable.getMessage());
+ }
+
+ @Override
+ public void onComplete() {
+ System.out.println("Streaming completed.");
+ }
+ };
+
+ try (var publisher = new SubmissionPublisher<Tuple>()) {
+ streamerFut = dummyTable.recordView().streamData(
+ publisher,
+ desc,
+ keyFunc,
+ payloadFunc,
+ null, /* Optional Receiver arguments*/
+ resultSubscriber,
+ null /* DataStreamer options */
+ );
+
+ for (Tuple item : sourceData) {
+ publisher.submit(item);
+ }
}
- @Override
- public void onNext(Tuple item) {
- System.out.println("Transaction processed: " + item);
+ streamerFut.join();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+
+ System.out.println("Cleaning up resources");
+ // undeployUnit(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION);
+
+ /* Drop table */
+ System.out.println("\nDropping the table...");
+ try (
+ Connection conn =
getConnection("jdbc:ignite:thin://127.0.0.1:10800/");
Review Comment:
This example uses raw JDBC
(`DriverManager.getConnection("jdbc:ignite:thin://...")`) for table cleanup,
while other examples use `client.sql().executeScript()`. The same inconsistency
exists in `MultiTableDataStreamerExample`.
Please use a consistent approach across all examples. Since the
`IgniteClient` is already available in the `finally` block, prefer
`client.sql().executeScript()` to avoid introducing unnecessary JDBC dependency
and connection management.
##########
examples/java/src/main/java/org/apache/ignite/example/compute/ComputeBroadcastExample.java:
##########
@@ -77,55 +124,87 @@ public static void main(String[] args) {
.addresses("127.0.0.1:10800")
.build()
) {
-
//--------------------------------------------------------------------------------------
- //
- // Configuring compute job.
- //
-
//--------------------------------------------------------------------------------------
- System.out.println("\nConfiguring compute job...");
+ try {
+
+ client.sql().executeScript("DROP SCHEMA IF EXISTS
CUSTOM_SCHEMA CASCADE;");
Review Comment:
The scope of this example has been significantly expanded. The original
`ComputeBroadcastExample` only demonstrated broadcast job execution. Now it
also:
- Creates 3 schemas/tables (`CUSTOM_SCHEMA.MY_QUALIFIED_TABLE`,
`PUBLIC.MY_TABLE`, `PERSON`)
- Inserts data into `PERSON`
- Drops tables in the `finally` block
This table setup was previously handled externally (e.g., by the cluster
setup or other examples). Embedding DDL here makes the broadcast example harder
to understand — readers must parse 20 lines of schema setup before reaching the
actual broadcast logic.
Consider keeping table setup separate or at least extracting it to a helper
method with a clear comment explaining why it's needed.
##########
examples/java/src/main/java/org/apache/ignite/example/compute/ComputeAsyncExample.java:
##########
@@ -32,40 +33,89 @@
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.example.code.deployment.AbstractDeploymentUnitExample;
/**
- * This example demonstrates the usage of the
- * {@link IgniteCompute#executeAsync(JobTarget, JobDescriptor, Object)} API.
+ * This example demonstrates the usage of the {@link
IgniteCompute#executeAsync(JobTarget, JobDescriptor, Object)} API.
*
- * <p>Find instructions on how to run the example in the README.md file
located in the "examples" directory root.
*
- * <p>The following steps related to code deployment should be additionally
executed before running the current example:
+ * <p>Find instructions on how to run the example in the {@code README.md}
+ * file located in the {@code examples} directory root.</p>
+ *
+ * <h2>Execution Modes</h2>
Review Comment:
This ~50-line "Execution Modes" Javadoc block is copy-pasted identically
across 14+ files. If the instructions change, all files must be updated
manually.
Consider:
1. Moving the detailed instructions to a single place (e.g., the `README.md`
or a shared doc), and referencing it from each example's Javadoc with a
one-liner.
2. Or at minimum, keeping only a brief note in each file and linking to the
common instructions.
(This was already flagged by @iartiukhov — extract common documentation to
avoid massive duplication.)
##########
examples/java/src/main/java/org/apache/ignite/example/streaming/DistributedComputeWithReceiverExample.java:
##########
@@ -30,98 +35,189 @@
import org.apache.ignite.catalog.ColumnType;
import org.apache.ignite.catalog.definitions.TableDefinition;
import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.example.code.deployment.AbstractDeploymentUnitExample;
import org.apache.ignite.table.DataStreamerReceiver;
import org.apache.ignite.table.DataStreamerReceiverContext;
import org.apache.ignite.table.DataStreamerReceiverDescriptor;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
-/** This example demonstrates how to use the streaming API to simulate a fraud
detection process,
- * which typically involves intensive processing of each transaction using ML
models.
+/**
+ * This example demonstrates how to use the streaming API to simulate a fraud
detection process, which typically involves intensive
+ * processing of each transaction using ML models.
+ *
+ * <p>Find instructions on how to run the example in the {@code README.md}
+ * file located in the {@code examples} directory root.</p>
+ *
+ * <h2>Execution Modes</h2>
+ *
+ * <p>There are two modes of execution:</p>
+ *
+ * <h3>1. Automated : The JAR Deployment for deployment unit is automated
</h3>
+ *
+ * <h4>1.1 With IDE</h4>
+ * <ul>
+ * <li>
+ * <b>Run from an IDE</b><br>
+ * Launch the example directly from the IDE. If the required deployment
+ * unit is not present, the example automatically builds and deploys the
+ * necessary JAR.
+ * </li>
+ * </ul>
+ *
+ * <h3>1.2 Without IDE</h3>
+ * <ul>
+ * <li>
+ * <b>Run from the command line</b><br>
+ * Start the example using a Java command where the classpath includes
+ * all required dependencies:
+ *
+ * <pre>{@code
+ * java -cp
"{user.home}\\.m2\\repository\\org\\apache\\ignite\\ignite-core\\3.1.0-SNAPSHOT\\
+ * ignite-core-3.1.0-SNAPSHOT.jar{other required jars}"
+ * <example-main-class> runFromIDE=false jarPath="{path-to-examples-jar}"
+ * }</pre>
+ *
+ * In this mode, {@code runFromIDE=false} indicates command-line execution,
+ * and {@code jarPath} must reference the examples JAR used as the
+ * deployment unit.
+ * </li>
+ * </ul>
+ *
+ * <h2>2. Manual (with IDE): The JAR deployment for the deployment unit is
manual</h2>
+ *
+ * <p>Before running this example, complete the following steps related to
+ * code deployment:</p>
+ *
+ * <ol>
+ * <li>
+ * Build the {@code ignite-examples-x.y.z.jar} file:<br>
+ * {@code ./gradlew :ignite-examples:jar}
+ * </li>
+ * <li>
+ * Deploy the generated JAR as a deployment unit using the CLI:<br>
+ * <pre>{@code
+ * cluster unit deploy computeExampleUnit \
+ * --version 1.0.0 \
+ * --path=$IGNITE_HOME/examples/build/libs/ignite-examples-x.y.z.jar
+ * }</pre>
+ * </li>
+ * </ol>
*/
-public class DistributedComputeWithReceiverExample {
- public static void main(String[] arg) {
+public class DistributedComputeWithReceiverExample extends
AbstractDeploymentUnitExample {
+
+ private static final String DEPLOYMENT_UNIT_NAME =
"streamerReceiverExampleUnit";
+
+ /** Deployment unit version. */
+ private static final String DEPLOYMENT_UNIT_VERSION = "1.0.0";
+
+ public static void main(String[] arg) throws Exception {
+
+ processDeploymentUnit(arg);
try (IgniteClient client = IgniteClient.builder()
.addresses("127.0.0.1:10800")
.build()) {
- /* Source data is a list of financial transactions */
- /* We distribute this processing across the cluster, then gather and
return results */
- List<Tuple> sourceData = IntStream.range(1, 10)
- .mapToObj(i -> Tuple.create()
- .set("txId", i)
- .set("txData", "{some-json-data}"))
- .collect(Collectors.toList());
+ deployIfNotExist(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION,
jarPath);
- DataStreamerReceiverDescriptor<Tuple, Void, Tuple> desc =
DataStreamerReceiverDescriptor
- .builder(FraudDetectorReceiver.class)
- .build();
+ /* Source data is a list of financial transactions */
+ /* We distribute this processing across the cluster, then gather
and return results */
+ List<Tuple> sourceData = IntStream.range(1, 10)
+ .mapToObj(i -> Tuple.create()
+ .set("txId", i)
+ .set("txData", "{some-json-data}"))
+ .collect(Collectors.toList());
- CompletableFuture<Void> streamerFut;
+ DataStreamerReceiverDescriptor<Tuple, Void, Tuple> desc =
DataStreamerReceiverDescriptor
+ .builder(FraudDetectorReceiver.class)
+ .units(new DeploymentUnit(DEPLOYMENT_UNIT_NAME,
DEPLOYMENT_UNIT_VERSION))
+ .build();
+
+ CompletableFuture<Void> streamerFut;
/* Streaming requires a target table to partition data.
/* Use a dummy table for this scenario, because we are not going to
store any data */
- TableDefinition txDummyTableDef = TableDefinition.builder("tx_dummy")
- .columns(column("id", ColumnType.INTEGER))
- .primaryKey("id")
- .build();
+ TableDefinition txDummyTableDef =
TableDefinition.builder("tx_dummy")
+ .columns(column("id", ColumnType.INTEGER))
+ .primaryKey("id")
+ .build();
+
+ Table dummyTable = client.catalog().createTable(txDummyTableDef);
- Table dummyTable = client.catalog().createTable(txDummyTableDef);
- /* Source data has "txId" field, but target dummy table has "id"
column, so keyFunc maps "txId" to "id" */
- Function<Tuple, Tuple> keyFunc = sourceItem ->
Tuple.create().set("id", sourceItem.value("txId"));
+ /* Source data has "txId" field, but target dummy table has "id"
column, so keyFunc maps "txId" to "id" */
+ Function<Tuple, Tuple> keyFunc = sourceItem ->
Tuple.create().set("id", sourceItem.value("txId"));
/* Payload function is used to extract the payload (data that goes to
the receiver) from the source item.
/* In our case, we want to use the whole source item as the payload */
- Function<Tuple, Tuple> payloadFunc = Function.identity();
-
- Flow.Subscriber<Tuple> resultSubscriber = new Flow.Subscriber<>() {
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
+ Function<Tuple, Tuple> payloadFunc = Function.identity();
+
+ Flow.Subscriber<Tuple> resultSubscriber = new Flow.Subscriber<>() {
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Tuple item) {
+ System.out.println("Transaction processed: " + item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.err.println("Error during streaming: " +
throwable.getMessage());
+ }
+
+ @Override
+ public void onComplete() {
+ System.out.println("Streaming completed.");
+ }
+ };
+
+ try (var publisher = new SubmissionPublisher<Tuple>()) {
+ streamerFut = dummyTable.recordView().streamData(
+ publisher,
+ desc,
+ keyFunc,
+ payloadFunc,
+ null, /* Optional Receiver arguments*/
+ resultSubscriber,
+ null /* DataStreamer options */
+ );
+
+ for (Tuple item : sourceData) {
+ publisher.submit(item);
+ }
}
- @Override
- public void onNext(Tuple item) {
- System.out.println("Transaction processed: " + item);
+ streamerFut.join();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+
+ System.out.println("Cleaning up resources");
+ // undeployUnit(DEPLOYMENT_UNIT_NAME, DEPLOYMENT_UNIT_VERSION);
Review Comment:
The `undeployUnit` call is commented out, which means the deployment unit is
never cleaned up. This is inconsistent with every other example in this PR.
Either uncomment it or add a comment explaining why cleanup is skipped here.
##########
examples/java/src/main/java/org/apache/ignite/example/table/QueryExample.java:
##########
@@ -62,7 +62,9 @@ public static void main(String[] args) throws Exception {
}
/**
- * Demonstrates querying with an implicit transaction.
+ * Demonstrates querying with an explicit transaction.
Review Comment:
The Javadoc says "Demonstrates querying with an **explicit** transaction"
but the method is `performQueryWithoutTransaction` which uses an **implicit**
transaction (`null` is passed as the transaction parameter). The original text
("implicit transaction") was correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]