PakhomovAlexander commented on code in PR #4438:
URL: https://github.com/apache/ignite-3/pull/4438#discussion_r1771724939


##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java:
##########
@@ -66,26 +81,34 @@ private static <T> void pack(@Nullable T obj, @Nullable 
Marshaller<T, byte[]> ma
         }
 
         if (obj instanceof Tuple) {
-            byte[] marshalledTuple = 
TupleWithSchemaMarshalling.marshal((Tuple) obj);
+            packer.packInt(MARSHALLED_TUPLE_ID);
 
-            packer.packInt(ComputeJobType.MARSHALLED_TUPLE_ID);
+            packTuple((Tuple) obj, packer);
+            return;
+        }
 
-            if (marshalledTuple == null) {
-                packer.packNil();
-                return;
-            }
+        if (isNativeType(obj.getClass())) {
+            packer.packInt(NATIVE_ID);
 
-            packer.packBinary(marshalledTuple);
+            packer.packObjectAsBinaryTuple(obj);
             return;
         }
 
-        if (obj == null) {
-            packer.packNil();
-            return;
+        try {
+            Tuple tuple = toTuple(obj);
+            packer.packInt(MARSHALLED_POJO_ID);
+            packer.packString(obj.getClass().getName());
+            packTuple(tuple, packer);
+        } catch (PojoConversionException e) {
+            throw new MarshallingException("Can't pack object", e);
         }
+    }
 
-        packer.packInt(ComputeJobType.NATIVE_ID);
+    private static boolean isNativeType(Class<?> clazz) {
+        return 
Arrays.stream(ColumnType.values()).map(ColumnType::javaClass).anyMatch(c -> c 
== clazz);

Review Comment:
   Build a set of classes in `ColumnType` on static load and check here with 
`contains()`.



##########
modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/pojo/UnmarshallablePojo.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto.pojo;
+
+/** POJO with incorrect fields. */
+public class UnmarshallablePojo {

Review Comment:
   Could you please split this pojo into 5? For each invalid case we have to 
have separate pojo. Combining all invalid cases in single class can hide bugs. 



##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java:
##########
@@ -57,51 +64,79 @@ public final class ClientComputeJobUnpacker {
         }
 
         int typeId = unpacker.unpackInt();
-        var type = ComputeJobType.Type.fromId(typeId);
 
-        switch (type) {
-            case NATIVE:
+        switch (typeId) {
+            case NATIVE_ID:
                 if (marshaller != null) {
                     throw new UnmarshallingException(
                             "Can not unpack object because the marshaller is 
provided but the object was packed without marshaller."
                     );
                 }
 
                 return unpacker.unpackObjectFromBinaryTuple();
-            case MARSHALLED_TUPLE:
+            case MARSHALLED_TUPLE_ID:
                 return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
 
-            case MARSHALLED_OBJECT:
+            case MARSHALLED_OBJECT_ID:
                 if (marshaller == null) {
                     throw new UnmarshallingException(
                             "Can not unpack object because the marshaller is 
not provided but the object was packed with marshaller."
                     );
                 }
                 return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
 
+            case MARSHALLED_POJO_ID:
+                return unpackPojo(unpacker);
+
             default:
                 throw new UnmarshallingException("Unsupported compute job type 
id: " + typeId);
         }
     }
 
     /** Unpacks compute job argument without marshaller. */
-    public static Object 
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {
+    public static @Nullable Object 
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {
         if (unpacker.tryUnpackNil()) {
             return null;
         }
 
         int typeId = unpacker.unpackInt();
-        var type = ComputeJobType.Type.fromId(typeId);
 
-        switch (type) {
-            case NATIVE:
+        switch (typeId) {
+            case NATIVE_ID:
                 return unpacker.unpackObjectFromBinaryTuple();
-            case MARSHALLED_TUPLE:
+            case MARSHALLED_TUPLE_ID:
                 return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
-            case MARSHALLED_OBJECT:
+            case MARSHALLED_OBJECT_ID:
                 return unpacker.readBinary();
+            case MARSHALLED_POJO_ID:
+                return unpackPojo(unpacker);
             default:
                 throw new UnmarshallingException("Unsupported compute job type 
id: " + typeId);
         }
     }
+
+    private static Object unpackPojo(ClientMessageUnpacker unpacker) {
+        String className = unpacker.unpackString();
+
+        try {
+            Class<?> clazz = Class.forName(className);
+            Object obj = clazz.getConstructor().newInstance();

Review Comment:
   We can not create an instance of the class that the client sent. We do not 
know if we have this class in the classpath, we do not know if the client named 
the pojo `org.apache.Ignite`, who should put the class name in platform 
clients? 
   
   The proper approach here is to post-pone the actual unpacking to the time 
where we can get the actual argument of the job and then create an instance of 
it. 
   
   The main concern is: we do not want client to send the class name to us. 



##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java:
##########
@@ -57,51 +64,79 @@ public final class ClientComputeJobUnpacker {
         }
 
         int typeId = unpacker.unpackInt();
-        var type = ComputeJobType.Type.fromId(typeId);
 
-        switch (type) {
-            case NATIVE:
+        switch (typeId) {
+            case NATIVE_ID:
                 if (marshaller != null) {
                     throw new UnmarshallingException(
                             "Can not unpack object because the marshaller is 
provided but the object was packed without marshaller."
                     );
                 }
 
                 return unpacker.unpackObjectFromBinaryTuple();
-            case MARSHALLED_TUPLE:
+            case MARSHALLED_TUPLE_ID:
                 return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
 
-            case MARSHALLED_OBJECT:
+            case MARSHALLED_OBJECT_ID:

Review Comment:
   The naming is confusing here... What is the difference between 
`MARSHALLED_OBJECT_ID` and `MARSHALLED_POJO_ID`? I think the better name for 
`MARSHALLED_OBJECT_ID` is `MARSHALLED_CUSTOM_ID`.



##########
modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/pojo/PojoConverter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto.pojo;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/** Converts POJO to Tuple and back. */
+public class PojoConverter {
+
+    /**
+     * Converts POJO to Tuple. Supports public non-static fields, public 
non-static getter methods starting with "get" (or "is" for boolean
+     * type) followed by the capital letter.
+     *
+     * @param obj POJO to convert.
+     * @return Tuple with columns corresponding to supported fields of the 
POJO.
+     * @throws PojoConversionException If conversion failed.
+     */
+    public static Tuple toTuple(Object obj) throws PojoConversionException {
+        Class<?> clazz = obj.getClass();
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-23092
+        if (clazz.getSuperclass() != Object.class) {
+            throw new PojoConversionException("Can't convert subclasses");
+        }
+
+        Map<String, Getter> getters = new HashMap<>();
+
+        for (Field field : clazz.getDeclaredFields()) {
+            if (isSupportedType(field.getType())) {
+                int modifiers = field.getModifiers();
+                if (Modifier.isPublic(modifiers) && 
!Modifier.isStatic(modifiers)) {
+                    getters.put(field.getName(), () -> field.get(obj));
+                }
+            }
+        }
+
+        for (Method method : clazz.getDeclaredMethods()) {

Review Comment:
   After the code is merged, can you create a ticket with the following 
description:
   
   During the pojo marshalling/unmarshalling we traverse the pojo class on the 
server side each time. Taking into account the fact that we are going to 
process a lot of data and change the pojo fields rearly, we can cache pojo 
descriptors in the cache. That should significantly improve the performance. 



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPojoComputeMarshallingTest.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.internal.runner.app.Jobs.PojoArg;
+import org.apache.ignite.internal.runner.app.Jobs.PojoJob;
+import org.apache.ignite.internal.runner.app.Jobs.PojoResult;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Test the OOTB support for POJOs in compute api.
+ */
+@SuppressWarnings("resource")
+public class ItThinClientPojoComputeMarshallingTest extends 
ItAbstractThinClientTest {
+
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1})
+    void pojoJobWithMarshallers(int targetNodeIdx) {
+        // Given target node.
+        var targetNode = node(targetNodeIdx);
+
+        // When run job with custom marshaller for pojo argument and result.
+        PojoResult result = client().compute().execute(
+                JobTarget.node(targetNode),
+                JobDescriptor.builder(PojoJob.class).build(),
+                new PojoArg().setIntValue(2).setStrValue("1")

Review Comment:
   The DoD for the previous comments about class name: this test works and the 
same test but with other pojos (the client pojo names are changed, say 
`PojoArg2` and `PojoResult2`, but the structure stays the same) on the client 
site. The server job definition stays absolutely the same (accepts `PojoArg` 
and returns `PojoResult`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to