PakhomovAlexander commented on code in PR #4438:
URL: https://github.com/apache/ignite-3/pull/4438#discussion_r1773427075
##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java:
##########
@@ -31,68 +31,83 @@
import org.apache.ignite.marshalling.UnmarshallingException;
import org.jetbrains.annotations.Nullable;
-/** Unpacks job arguments and results. */
+/** Unpacks job results. */
public final class ClientComputeJobUnpacker {
- /**
- * Unpacks compute job argument. If the marshaller is provided, it will be
used to unmarshal the argument. If the marshaller is not
- * provided and the argument is a native column type or a tuple, it will
be unpacked accordingly.
- *
- * @param marshaller Marshaller.
- * @param unpacker Unpacker.
- * @return Unpacked argument.
- */
- public static @Nullable Object unpackJobArgument(@Nullable Marshaller<?,
byte[]> marshaller, ClientMessageUnpacker unpacker) {
- return unpack(marshaller, unpacker);
- }
-
/**
* Unpacks compute job result. If the marshaller is provided, it will be
used to unmarshal the result. If the marshaller is not provided
- * and the result is a native column type or a tuple, it will be unpacked
accordingly.
+ * and the result class is provided and the result is a tuple, it will be
unpacked as a pojo of that class. If the marshaller is not
+ * provided and the result is a native column type or a tuple, it will be
unpacked accordingly.
*
- * @param marshaller Marshaller.
* @param unpacker Unpacker.
+ * @param marshaller Marshaller.
+ * @param resultClass Result class.
* @return Unpacked result.
*/
- public static @Nullable Object unpackJobResult(@Nullable Marshaller<?,
byte[]> marshaller, ClientMessageUnpacker unpacker) {
- return unpack(marshaller, unpacker);
- }
-
- /** Underlying byte array expected to be in the following format: | typeId
| value |. */
- private static @Nullable Object unpack(@Nullable Marshaller<?, byte[]>
marshaller, ClientMessageUnpacker unpacker) {
+ public static @Nullable Object unpackJobResult(
+ ClientMessageUnpacker unpacker,
+ @Nullable Marshaller<?, byte[]> marshaller,
+ @Nullable Class<?> resultClass
+ ) {
if (unpacker.tryUnpackNil()) {
return null;
}
+ // Underlying byte array expected to be in the following format: |
typeId | value |.
int typeId = unpacker.unpackInt();
switch (typeId) {
- case NATIVE_ID:
+ case NATIVE:
if (marshaller != null) {
throw new UnmarshallingException(
"Can not unpack object because the marshaller is
provided but the object was packed without marshaller."
);
}
return unpacker.unpackObjectFromBinaryTuple();
- case MARSHALLED_TUPLE_ID:
+ case MARSHALLED_TUPLE:
return
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
- case MARSHALLED_OBJECT_ID:
+ case MARSHALLED_CUSTOM:
if (marshaller == null) {
throw new UnmarshallingException(
"Can not unpack object because the marshaller is
not provided but the object was packed with marshaller."
);
}
return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
- case MARSHALLED_POJO_ID:
- return unpackPojo(unpacker);
+ case MARSHALLED_POJO:
+ if (resultClass == null) {
+ throw new UnmarshallingException(
+ "Can not unpack object because the pojo class is
not provided but the object was packed as pojo."
+ );
+ }
+ return unpackPojo(unpacker, resultClass);
default:
throw new UnmarshallingException("Unsupported compute job type
id: " + typeId);
}
}
+ private static Object unpackPojo(ClientMessageUnpacker unpacker, Class<?>
pojoClass) {
+ try {
+ Object obj = pojoClass.getConstructor().newInstance();
+
+ fromTuple(obj,
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary()));
+
+ return obj;
+ } catch (NoSuchMethodException e) {
+ throw new UnmarshallingException("Class " + pojoClass.getName() +
" doesn't have public default constructor", e);
Review Comment:
```suggestion
throw new UnmarshallingException("Class " + pojoClass.getName()
+ " doesn't have public default constructor. Add the default constructor or
provide Marshaller for " + pojoClass.getName() + " in
JobDescriptor.resultMarshaller", e);
```
##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java:
##########
@@ -31,68 +31,83 @@
import org.apache.ignite.marshalling.UnmarshallingException;
import org.jetbrains.annotations.Nullable;
-/** Unpacks job arguments and results. */
+/** Unpacks job results. */
public final class ClientComputeJobUnpacker {
- /**
- * Unpacks compute job argument. If the marshaller is provided, it will be
used to unmarshal the argument. If the marshaller is not
- * provided and the argument is a native column type or a tuple, it will
be unpacked accordingly.
- *
- * @param marshaller Marshaller.
- * @param unpacker Unpacker.
- * @return Unpacked argument.
- */
- public static @Nullable Object unpackJobArgument(@Nullable Marshaller<?,
byte[]> marshaller, ClientMessageUnpacker unpacker) {
- return unpack(marshaller, unpacker);
- }
-
/**
* Unpacks compute job result. If the marshaller is provided, it will be
used to unmarshal the result. If the marshaller is not provided
- * and the result is a native column type or a tuple, it will be unpacked
accordingly.
+ * and the result class is provided and the result is a tuple, it will be
unpacked as a pojo of that class. If the marshaller is not
+ * provided and the result is a native column type or a tuple, it will be
unpacked accordingly.
*
- * @param marshaller Marshaller.
* @param unpacker Unpacker.
+ * @param marshaller Marshaller.
+ * @param resultClass Result class.
* @return Unpacked result.
*/
- public static @Nullable Object unpackJobResult(@Nullable Marshaller<?,
byte[]> marshaller, ClientMessageUnpacker unpacker) {
- return unpack(marshaller, unpacker);
- }
-
- /** Underlying byte array expected to be in the following format: | typeId
| value |. */
- private static @Nullable Object unpack(@Nullable Marshaller<?, byte[]>
marshaller, ClientMessageUnpacker unpacker) {
+ public static @Nullable Object unpackJobResult(
+ ClientMessageUnpacker unpacker,
+ @Nullable Marshaller<?, byte[]> marshaller,
+ @Nullable Class<?> resultClass
+ ) {
if (unpacker.tryUnpackNil()) {
return null;
}
+ // Underlying byte array expected to be in the following format: |
typeId | value |.
int typeId = unpacker.unpackInt();
switch (typeId) {
- case NATIVE_ID:
+ case NATIVE:
if (marshaller != null) {
throw new UnmarshallingException(
"Can not unpack object because the marshaller is
provided but the object was packed without marshaller."
);
}
return unpacker.unpackObjectFromBinaryTuple();
- case MARSHALLED_TUPLE_ID:
+ case MARSHALLED_TUPLE:
return
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
- case MARSHALLED_OBJECT_ID:
+ case MARSHALLED_CUSTOM:
if (marshaller == null) {
throw new UnmarshallingException(
"Can not unpack object because the marshaller is
not provided but the object was packed with marshaller."
);
}
return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
- case MARSHALLED_POJO_ID:
- return unpackPojo(unpacker);
+ case MARSHALLED_POJO:
+ if (resultClass == null) {
+ throw new UnmarshallingException(
+ "Can not unpack object because the pojo class is
not provided but the object was packed as pojo."
Review Comment:
```suggestion
"Can not unpack object because the pojo class is
not provided but the object was packed as pojo. Provide Job result type in
JobDescriptor.resultClass."
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java:
##########
@@ -137,6 +159,39 @@ private static <T, R> Callable<CompletableFuture<R>>
unmarshalExecMarshal(
);
}
+ static <T, R> @Nullable Class<?> getArgumentType(Class<? extends
ComputeJob<T, R>> jobClass) {
+ for (Method method : jobClass.getDeclaredMethods()) {
+ if (method.getParameterCount() == 2
+ && method.getParameterTypes()[0] ==
JobExecutionContext.class
+ && method.getParameterTypes()[1] != Object.class // skip
type erased method
+ && method.getReturnType() == CompletableFuture.class
+ ) {
+ return method.getParameterTypes()[1];
+ }
+ }
+ return null;
+ }
+
+ private static Object unmarshallPojo(Class<?> actualArgumentType, Tuple
input) {
+ try {
+ Object obj = actualArgumentType.getConstructor().newInstance();
+
+ fromTuple(obj, input);
+
+ return obj;
+ } catch (NoSuchMethodException e) {
+ throw new UnmarshallingException("Class " +
actualArgumentType.getName() + " doesn't have public default constructor", e);
Review Comment:
```suggestion
throw new UnmarshallingException("Class " +
actualArgumentType.getName() + " doesn't have public default constructor. Add
the constructor or define argument marshaller in the compute job", e);
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java:
##########
@@ -137,6 +159,39 @@ private static <T, R> Callable<CompletableFuture<R>>
unmarshalExecMarshal(
);
}
+ static <T, R> @Nullable Class<?> getArgumentType(Class<? extends
ComputeJob<T, R>> jobClass) {
+ for (Method method : jobClass.getDeclaredMethods()) {
+ if (method.getParameterCount() == 2
Review Comment:
I would also add the method name check here.
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java:
##########
@@ -104,31 +110,47 @@ public <T, R> JobExecutionInternal<R> executeJob(
private static <T, R> Callable<CompletableFuture<R>> unmarshalExecMarshal(
T input,
+ Class<? extends ComputeJob<T, R>> jobClass,
ComputeJob<T, R> jobInstance,
JobExecutionContext context,
@Nullable Marshaller<T, byte[]> inputMarshaller
) {
- return () -> jobInstance.executeAsync(context,
unmarshallOrNotIfNull(inputMarshaller, input));
+ return () -> jobInstance.executeAsync(context,
unmarshallOrNotIfNull(inputMarshaller, input, jobClass));
}
- private static <T> @Nullable T unmarshallOrNotIfNull(@Nullable
Marshaller<T, byte[]> marshaller, Object input) {
- if (marshaller == null || input == null) {
+ private static <T, R> @Nullable T unmarshallOrNotIfNull(
+ @Nullable Marshaller<T, byte[]> marshaller,
+ Object input,
Review Comment:
```suggestion
@Nullable Object input,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]