PakhomovAlexander commented on code in PR #4438:
URL: https://github.com/apache/ignite-3/pull/4438#discussion_r1773427075


##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java:
##########
@@ -31,68 +31,83 @@
 import org.apache.ignite.marshalling.UnmarshallingException;
 import org.jetbrains.annotations.Nullable;
 
-/** Unpacks job arguments and results. */
+/** Unpacks job results. */
 public final class ClientComputeJobUnpacker {
-    /**
-     * Unpacks compute job argument. If the marshaller is provided, it will be 
used to unmarshal the argument. If the marshaller is not
-     * provided and the argument is a native column type or a tuple, it will 
be unpacked accordingly.
-     *
-     * @param marshaller Marshaller.
-     * @param unpacker Unpacker.
-     * @return Unpacked argument.
-     */
-    public static @Nullable Object unpackJobArgument(@Nullable Marshaller<?, 
byte[]> marshaller, ClientMessageUnpacker unpacker) {
-        return unpack(marshaller, unpacker);
-    }
-
     /**
      * Unpacks compute job result. If the marshaller is provided, it will be 
used to unmarshal the result. If the marshaller is not provided
-     * and the result is a native column type or a tuple, it will be unpacked 
accordingly.
+     * and the result class is provided and the result is a tuple, it will be 
unpacked as a pojo of that class. If the marshaller is not
+     * provided and the result is a native column type or a tuple, it will be 
unpacked accordingly.
      *
-     * @param marshaller Marshaller.
      * @param unpacker Unpacker.
+     * @param marshaller Marshaller.
+     * @param resultClass Result class.
      * @return Unpacked result.
      */
-    public static @Nullable Object unpackJobResult(@Nullable Marshaller<?, 
byte[]> marshaller, ClientMessageUnpacker unpacker) {
-        return unpack(marshaller, unpacker);
-    }
-
-    /** Underlying byte array expected to be in the following format: | typeId 
| value |. */
-    private static @Nullable Object unpack(@Nullable Marshaller<?, byte[]> 
marshaller, ClientMessageUnpacker unpacker) {
+    public static @Nullable Object unpackJobResult(
+            ClientMessageUnpacker unpacker,
+            @Nullable Marshaller<?, byte[]> marshaller,
+            @Nullable Class<?> resultClass
+    ) {
         if (unpacker.tryUnpackNil()) {
             return null;
         }
 
+        // Underlying byte array expected to be in the following format: | 
typeId | value |.
         int typeId = unpacker.unpackInt();
 
         switch (typeId) {
-            case NATIVE_ID:
+            case NATIVE:
                 if (marshaller != null) {
                     throw new UnmarshallingException(
                             "Can not unpack object because the marshaller is 
provided but the object was packed without marshaller."
                     );
                 }
 
                 return unpacker.unpackObjectFromBinaryTuple();
-            case MARSHALLED_TUPLE_ID:
+            case MARSHALLED_TUPLE:
                 return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
 
-            case MARSHALLED_OBJECT_ID:
+            case MARSHALLED_CUSTOM:
                 if (marshaller == null) {
                     throw new UnmarshallingException(
                             "Can not unpack object because the marshaller is 
not provided but the object was packed with marshaller."
                     );
                 }
                 return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
 
-            case MARSHALLED_POJO_ID:
-                return unpackPojo(unpacker);
+            case MARSHALLED_POJO:
+                if (resultClass == null) {
+                    throw new UnmarshallingException(
+                            "Can not unpack object because the pojo class is 
not provided but the object was packed as pojo."
+                    );
+                }
+                return unpackPojo(unpacker, resultClass);
 
             default:
                 throw new UnmarshallingException("Unsupported compute job type 
id: " + typeId);
         }
     }
 
+    private static Object unpackPojo(ClientMessageUnpacker unpacker, Class<?> 
pojoClass) {
+        try {
+            Object obj = pojoClass.getConstructor().newInstance();
+
+            fromTuple(obj, 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary()));
+
+            return obj;
+        } catch (NoSuchMethodException e) {
+            throw new UnmarshallingException("Class " + pojoClass.getName() + 
" doesn't have public default constructor", e);

Review Comment:
   ```suggestion
               throw new UnmarshallingException("Class " + pojoClass.getName() 
+ " doesn't have public default constructor. Add the default constructor or 
provide Marshaller for "   + pojoClass.getName() + "  in 
JobDescriptor.resultMarshaller",  e);
   ```



##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java:
##########
@@ -31,68 +31,83 @@
 import org.apache.ignite.marshalling.UnmarshallingException;
 import org.jetbrains.annotations.Nullable;
 
-/** Unpacks job arguments and results. */
+/** Unpacks job results. */
 public final class ClientComputeJobUnpacker {
-    /**
-     * Unpacks compute job argument. If the marshaller is provided, it will be 
used to unmarshal the argument. If the marshaller is not
-     * provided and the argument is a native column type or a tuple, it will 
be unpacked accordingly.
-     *
-     * @param marshaller Marshaller.
-     * @param unpacker Unpacker.
-     * @return Unpacked argument.
-     */
-    public static @Nullable Object unpackJobArgument(@Nullable Marshaller<?, 
byte[]> marshaller, ClientMessageUnpacker unpacker) {
-        return unpack(marshaller, unpacker);
-    }
-
     /**
      * Unpacks compute job result. If the marshaller is provided, it will be 
used to unmarshal the result. If the marshaller is not provided
-     * and the result is a native column type or a tuple, it will be unpacked 
accordingly.
+     * and the result class is provided and the result is a tuple, it will be 
unpacked as a pojo of that class. If the marshaller is not
+     * provided and the result is a native column type or a tuple, it will be 
unpacked accordingly.
      *
-     * @param marshaller Marshaller.
      * @param unpacker Unpacker.
+     * @param marshaller Marshaller.
+     * @param resultClass Result class.
      * @return Unpacked result.
      */
-    public static @Nullable Object unpackJobResult(@Nullable Marshaller<?, 
byte[]> marshaller, ClientMessageUnpacker unpacker) {
-        return unpack(marshaller, unpacker);
-    }
-
-    /** Underlying byte array expected to be in the following format: | typeId 
| value |. */
-    private static @Nullable Object unpack(@Nullable Marshaller<?, byte[]> 
marshaller, ClientMessageUnpacker unpacker) {
+    public static @Nullable Object unpackJobResult(
+            ClientMessageUnpacker unpacker,
+            @Nullable Marshaller<?, byte[]> marshaller,
+            @Nullable Class<?> resultClass
+    ) {
         if (unpacker.tryUnpackNil()) {
             return null;
         }
 
+        // Underlying byte array expected to be in the following format: | 
typeId | value |.
         int typeId = unpacker.unpackInt();
 
         switch (typeId) {
-            case NATIVE_ID:
+            case NATIVE:
                 if (marshaller != null) {
                     throw new UnmarshallingException(
                             "Can not unpack object because the marshaller is 
provided but the object was packed without marshaller."
                     );
                 }
 
                 return unpacker.unpackObjectFromBinaryTuple();
-            case MARSHALLED_TUPLE_ID:
+            case MARSHALLED_TUPLE:
                 return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
 
-            case MARSHALLED_OBJECT_ID:
+            case MARSHALLED_CUSTOM:
                 if (marshaller == null) {
                     throw new UnmarshallingException(
                             "Can not unpack object because the marshaller is 
not provided but the object was packed with marshaller."
                     );
                 }
                 return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
 
-            case MARSHALLED_POJO_ID:
-                return unpackPojo(unpacker);
+            case MARSHALLED_POJO:
+                if (resultClass == null) {
+                    throw new UnmarshallingException(
+                            "Can not unpack object because the pojo class is 
not provided but the object was packed as pojo."

Review Comment:
   ```suggestion
                               "Can not unpack object because the pojo class is 
not provided but the object was packed as pojo. Provide Job result type in 
JobDescriptor.resultClass."
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java:
##########
@@ -137,6 +159,39 @@ private static <T, R> Callable<CompletableFuture<R>> 
unmarshalExecMarshal(
         );
     }
 
+    static <T, R> @Nullable Class<?> getArgumentType(Class<? extends 
ComputeJob<T, R>> jobClass) {
+        for (Method method : jobClass.getDeclaredMethods()) {
+            if (method.getParameterCount() == 2
+                    && method.getParameterTypes()[0] == 
JobExecutionContext.class
+                    && method.getParameterTypes()[1] != Object.class // skip 
type erased method
+                    && method.getReturnType() == CompletableFuture.class
+            ) {
+                return method.getParameterTypes()[1];
+            }
+        }
+        return null;
+    }
+
+    private static Object unmarshallPojo(Class<?> actualArgumentType, Tuple 
input) {
+        try {
+            Object obj = actualArgumentType.getConstructor().newInstance();
+
+            fromTuple(obj, input);
+
+            return obj;
+        } catch (NoSuchMethodException e) {
+            throw new UnmarshallingException("Class " + 
actualArgumentType.getName() + " doesn't have public default constructor", e);

Review Comment:
   ```suggestion
               throw new UnmarshallingException("Class " + 
actualArgumentType.getName() + " doesn't have public default constructor. Add 
the constructor or define argument marshaller in the compute job", e);
   ```



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java:
##########
@@ -137,6 +159,39 @@ private static <T, R> Callable<CompletableFuture<R>> 
unmarshalExecMarshal(
         );
     }
 
+    static <T, R> @Nullable Class<?> getArgumentType(Class<? extends 
ComputeJob<T, R>> jobClass) {
+        for (Method method : jobClass.getDeclaredMethods()) {
+            if (method.getParameterCount() == 2

Review Comment:
   I would also add the method name check here.



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java:
##########
@@ -104,31 +110,47 @@ public <T, R> JobExecutionInternal<R> executeJob(
 
     private static <T, R> Callable<CompletableFuture<R>> unmarshalExecMarshal(
             T input,
+            Class<? extends ComputeJob<T, R>> jobClass,
             ComputeJob<T, R> jobInstance,
             JobExecutionContext context,
             @Nullable Marshaller<T, byte[]> inputMarshaller
     ) {
-        return () -> jobInstance.executeAsync(context, 
unmarshallOrNotIfNull(inputMarshaller, input));
+        return () -> jobInstance.executeAsync(context, 
unmarshallOrNotIfNull(inputMarshaller, input, jobClass));
     }
 
-    private static <T> @Nullable T unmarshallOrNotIfNull(@Nullable 
Marshaller<T, byte[]> marshaller, Object input) {
-        if (marshaller == null || input == null) {
+    private static <T, R> @Nullable T unmarshallOrNotIfNull(
+            @Nullable Marshaller<T, byte[]> marshaller,
+            Object input,

Review Comment:
   ```suggestion
               @Nullable Object input,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to