valepakh commented on code in PR #5017:
URL: https://github.com/apache/ignite-3/pull/5017#discussion_r1910270704


##########
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java:
##########
@@ -284,30 +298,35 @@ private Stream<Arguments> targetNodeIndexes() {
         return IntStream.range(0, initialNodes()).mapToObj(Arguments::of);
     }
 
-    private static class CustomFailingJob implements ComputeJob<Throwable, 
String> {
+    private static class CustomFailingJob implements ComputeJob<Integer, 
String> {

Review Comment:
   Could be `<Void, Void>` to indicate that we don't care about types here.
   Other jobs could also use `Void` instead of `String` or `Object` in these 
cases.



##########
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java:
##########
@@ -284,30 +298,35 @@ private Stream<Arguments> targetNodeIndexes() {
         return IntStream.range(0, initialNodes()).mapToObj(Arguments::of);
     }
 
-    private static class CustomFailingJob implements ComputeJob<Throwable, 
String> {
+    private static class CustomFailingJob implements ComputeJob<Integer, 
String> {
+        static Throwable th;
+
         @Override
-        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Throwable th) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Integer arg) {
             throw ExceptionUtils.sneakyThrow(th);
         }
     }
 
-    private static class WaitLatchJob implements ComputeJob<CountDownLatch, 
String> {
+    private static class WaitLatchJob implements ComputeJob<Integer, String> {
+        private static CountDownLatch[] latches;
+
         @Override
-        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, CountDownLatch latch) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Integer latchId) {

Review Comment:
   ```suggestion
           public @Nullable CompletableFuture<String> 
executeAsync(JobExecutionContext context, Integer latchId) {
   ```



##########
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java:
##########
@@ -284,30 +298,35 @@ private Stream<Arguments> targetNodeIndexes() {
         return IntStream.range(0, initialNodes()).mapToObj(Arguments::of);
     }
 
-    private static class CustomFailingJob implements ComputeJob<Throwable, 
String> {
+    private static class CustomFailingJob implements ComputeJob<Integer, 
String> {
+        static Throwable th;
+
         @Override
-        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Throwable th) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Integer arg) {
             throw ExceptionUtils.sneakyThrow(th);
         }
     }
 
-    private static class WaitLatchJob implements ComputeJob<CountDownLatch, 
String> {
+    private static class WaitLatchJob implements ComputeJob<Integer, String> {
+        private static CountDownLatch[] latches;
+
         @Override
-        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, CountDownLatch latch) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Integer latchId) {
             try {
-                latch.await();
+                latches[latchId].await();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
             return null;
         }
     }
 
-    private static class WaitLatchThrowExceptionOnFirstExecutionJob implements 
ComputeJob<CountDownLatch, String> {
+    private static class WaitLatchThrowExceptionOnFirstExecutionJob implements 
ComputeJob<Object, String> {
+        private static CountDownLatch latch;
         static final AtomicInteger counter = new AtomicInteger(0);
 
         @Override
-        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, CountDownLatch latch) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object arg) {

Review Comment:
   ```suggestion
           public @Nullable CompletableFuture<String> 
executeAsync(JobExecutionContext context, Object arg) {
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java:
##########
@@ -94,27 +95,31 @@ public <T, R> JobExecutionInternal<R> executeJob(
         Marshaller<T, byte[]> inputMarshaller = jobInstance.inputMarshaller();
         Marshaller<R, byte[]> resultMarshaller = 
jobInstance.resultMarshaller();
 
-        // If input is of this type, this means that the request came from the 
thin client and packing the result to the byte array will be
-        // needed in any case. In order to minimize conversion, marshal the 
result here.
-        boolean marshalResult = input instanceof ComputeJobDataHolder;
-
-        QueueExecution<R> execution = executorService.submit(
-                unmarshalExecMarshal(input, jobClass, jobInstance, context, 
inputMarshaller),
+        QueueExecution<ComputeJobDataHolder> execution = 
executorService.submit(
+                unmarshalExecMarshal(input, jobClass, jobInstance, context, 
inputMarshaller, resultMarshaller),
                 options.priority(),
                 options.maxRetries()
         );
 
-        return new JobExecutionInternal<>(execution, isInterrupted, 
resultMarshaller, marshalResult);
+        return new JobExecutionInternal<>(execution, isInterrupted, null, 
false);

Review Comment:
   `marshaller` argument is always `null` now, seems like all the logic of the 
`MarshallerProvider` can be removed?



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java:
##########
@@ -255,16 +230,15 @@ public static CompletableFuture<UUID> 
jobIdFromExecuteResponse(ExecuteResponse e
      * Extract Compute job result from execute response.
      *
      * @param jobResultResponse Job execution result message response.
-     * @param <R> Compute job return type.
      * @return Completable future with result.
      */
-    public static <R> CompletableFuture<R> 
resultFromJobResultResponse(JobResultResponse jobResultResponse) {
+    public static  CompletableFuture<ComputeJobDataHolder> 
resultFromJobResultResponse(JobResultResponse jobResultResponse) {

Review Comment:
   ```suggestion
       public static CompletableFuture<ComputeJobDataHolder> 
resultFromJobResultResponse(JobResultResponse jobResultResponse) {
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -200,28 +201,30 @@ public <I, M, T, R> TaskExecution<R> executeTask(
 
     /** {@inheritDoc} */
     @Override
-    public <T, R> JobExecution<R> executeRemotely(
+    public JobExecution<ComputeJobDataHolder> executeRemotely(
             ExecutionOptions options,
             ClusterNode remoteNode,
             List<DeploymentUnit> units,
             String jobClassName,
             @Nullable CancellationToken cancellationToken,
-            T arg
+            ComputeJobDataHolder arg

Review Comment:
   ```suggestion
               @Nullable ComputeJobDataHolder arg
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java:
##########
@@ -71,17 +70,16 @@ <R> JobExecution<R> executeAsyncWithFailover(
      * @param options job execution options (priority, max retries).
      * @param cancellationToken Cancellation token or {@code null}.
      * @param payload Arguments of the job.
-     * @param <R> Job result type.
      * @return Job execution object.
      */
-    <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(
+    CompletableFuture<JobExecution<ComputeJobDataHolder>> 
submitColocatedInternal(
             TableViewInternal table,
             Tuple key,
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
             @Nullable CancellationToken cancellationToken,
-            Object payload);
+            ComputeJobDataHolder payload);

Review Comment:
   ```suggestion
               @Nullable ComputeJobDataHolder payload);
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java:
##########
@@ -84,16 +63,15 @@ default <T, R> JobExecution<R> executeLocally(
      * @param jobClassName Name of the job class.
      * @param cancellationToken Cancellation token or {@code null}.
      * @param arg Job args.
-     * @param <R> Job result type.
      * @return Job execution object.
      */
-    <T, R> JobExecution<R> executeRemotely(
+    JobExecution<ComputeJobDataHolder> executeRemotely(
             ExecutionOptions options,
             ClusterNode remoteNode,
             List<DeploymentUnit> units,
             String jobClassName,
             @Nullable CancellationToken cancellationToken,
-            T arg
+            ComputeJobDataHolder arg

Review Comment:
   ```suggestion
               @Nullable ComputeJobDataHolder arg
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java:
##########
@@ -115,7 +113,7 @@ class ComputeJobFailover<R> {
             String jobClassName,
             ExecutionOptions executionOptions,
             @Nullable CancellationToken cancellationToken,
-            Object args
+            ComputeJobDataHolder args

Review Comment:
   ```suggestion
               @Nullable ComputeJobDataHolder arg
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java:
##########
@@ -96,7 +95,7 @@ String jobClassName() {
         return jobClassName;
     }
 
-    T arg() {
+    ComputeJobDataHolder arg() {

Review Comment:
   Looks like `T` is redundant now and `R` is always `ComputeJobDataHolder`



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -320,7 +324,11 @@ public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
     }
 
 
-    private <T, R> JobExecutionInternal<R> execJob(JobContext context, 
ExecutionOptions options, String jobClassName, T arg) {
+    private <T, R> JobExecutionInternal<ComputeJobDataHolder> execJob(

Review Comment:
   ```suggestion
       private JobExecutionInternal<ComputeJobDataHolder> execJob(
   ```



##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java:
##########
@@ -52,97 +41,13 @@ public final class ClientComputeJobUnpacker {
             @Nullable Marshaller<?, byte[]> marshaller,
             @Nullable Class<?> resultClass
     ) {
-        if (unpacker.tryUnpackNil()) {
-            return null;
-        }
-
-        // Underlying byte array expected to be in the following format: | 
typeId | value |.
-        int typeId = unpacker.unpackInt();
-        ComputeJobDataType type = ComputeJobDataType.fromId(typeId);
-        if (type == null) {
-            throw new UnmarshallingException("Unsupported compute job type id: 
" + typeId);
-        }
-
-        switch (type) {
-            case NATIVE:
-                if (marshaller != null) {
-                    throw new UnmarshallingException(
-                            "Can not unpack object because the marshaller is 
provided but the object was packed without marshaller."
-                    );
-                }
-
-                return unpacker.unpackObjectFromBinaryTuple();
-
-            case TUPLE:
-                return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
+        ComputeJobDataHolder holder = 
unpackJobArgumentWithoutMarshaller(unpacker);
 
-            case MARSHALLED_CUSTOM:
-                if (marshaller == null) {
-                    throw new UnmarshallingException(
-                            "Can not unpack object because the marshaller is 
not provided but the object was packed with marshaller."
-                    );
-                }
-                return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
-
-            case POJO:
-                if (resultClass == null) {
-                    throw new UnmarshallingException(
-                            "Can not unpack object because the pojo class is 
not provided but the object was packed as pojo. "
-                                    + "Provide Job result type in 
JobDescriptor.resultClass."
-                    );
-                }
-                return unpackPojo(unpacker, resultClass);
-
-            case TUPLE_COLLECTION: {
-                // TODO: IGNITE-24059 Deduplicate with ComputeUtils.
-                ByteBuffer collectionBuf = 
unpacker.readBinaryUnsafe().order(ByteOrder.LITTLE_ENDIAN);
-                int count = collectionBuf.getInt();
-                BinaryTupleReader reader = new BinaryTupleReader(count, 
collectionBuf.slice().order(ByteOrder.LITTLE_ENDIAN));
-
-                List<Tuple> res = new ArrayList<>(count);
-                for (int i = 0; i < count; i++) {
-                    ByteBuffer elementBytes = reader.bytesValueAsBuffer(i);
-
-                    if (elementBytes == null) {
-                        res.add(null);
-                        continue;
-                    }
-
-                    
res.add(TupleWithSchemaMarshalling.unmarshal(elementBytes));
-                }
-
-                return res;
-            }
-
-            default:
-                throw new UnmarshallingException("Unsupported compute job 
type: " + type);
-        }
-    }
-
-    private static Object unpackPojo(ClientMessageUnpacker unpacker, Class<?> 
pojoClass) {
-        try {
-            Object obj = pojoClass.getConstructor().newInstance();
-
-            fromTuple(obj, 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary()));
-
-            return obj;
-        } catch (NoSuchMethodException e) {
-            throw new UnmarshallingException("Class " + pojoClass.getName() + 
" doesn't have public default constructor. "
-                    + "Add the default constructor or provide Marshaller for " 
+ pojoClass.getName() + " in JobDescriptor.resultMarshaller",
-                    e);
-        } catch (InvocationTargetException e) {
-            throw new UnmarshallingException("Constructor has thrown an 
exception", e);
-        } catch (InstantiationException e) {
-            throw new UnmarshallingException("Can't instantiate an object of 
class " + pojoClass.getName(), e);
-        } catch (IllegalAccessException e) {
-            throw new UnmarshallingException("Constructor is inaccessible", e);
-        } catch (PojoConversionException e) {
-            throw new UnmarshallingException("Can't unpack object", e);
-        }
+        return SharedComputeUtils.unmarshalArgOrResult(holder, marshaller, 
resultClass);
     }
 
     /** Unpacks compute job argument without marshaller. */
-    public static @Nullable Object 
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {
+    public static @Nullable ComputeJobDataHolder 
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {

Review Comment:
   `unpackJobDataHolder` maybe?
   Is it worth moving to the `SharedComputeUtils` and 
ClientComputeJobPacker/Unpacker classes and tests back to the client module?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to