HuangXingBo commented on a change in pull request #13327:
URL: https://github.com/apache/flink/pull/13327#discussion_r484157316



##########
File path: 
flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java
##########
@@ -194,7 +201,54 @@
                        }
 
                        FlinkFnApi.TypeInfo.FieldType.Builder builder = 
FlinkFnApi.TypeInfo.FieldType.newBuilder()
-                               
.setTypeName(FlinkFnApi.TypeInfo.TypeName.ARRAY);
+                               
.setTypeName(FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY);
+                       builder.setCollectionElementType(elementFieldType);
+                       return builder.build();
+               }
+
+               private static FlinkFnApi.TypeInfo.FieldType 
buildBsicArrayTypeProto(

Review comment:
       ```suggestion
                private static FlinkFnApi.TypeInfo.FieldType 
buildBasicArrayTypeProto(
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java
##########
@@ -308,6 +363,18 @@ public static TypeSerializer 
typeInfoSerializerConverter(TypeInformation typeInf
                                                .map(f -> 
typeInfoSerializerConverter(f)).toArray(TypeSerializer[]::new);
                                        return new 
TupleSerializer(Tuple.getTupleClass(tupleTypeInfo.getArity()), 
fieldTypeSerialzers);
                                }
+
+                               if (typeInformation instanceof 
BasicArrayTypeInfo){
+                                       BasicArrayTypeInfo basicArrayTypeInfo = 
(BasicArrayTypeInfo) typeInformation;
+                                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)){

Review comment:
       Maybe we need to add a e2e test for StringArraySerializer.

##########
File path: 
flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java
##########
@@ -194,7 +201,54 @@
                        }
 
                        FlinkFnApi.TypeInfo.FieldType.Builder builder = 
FlinkFnApi.TypeInfo.FieldType.newBuilder()
-                               
.setTypeName(FlinkFnApi.TypeInfo.TypeName.ARRAY);
+                               
.setTypeName(FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY);
+                       builder.setCollectionElementType(elementFieldType);
+                       return builder.build();
+               }
+
+               private static FlinkFnApi.TypeInfo.FieldType 
buildBsicArrayTypeProto(
+                       BasicArrayTypeInfo basicArrayTypeInfo) {
+                       FlinkFnApi.TypeInfo.FieldType elementFieldType = null;
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.BOOLEAN_TYPE_INFO);
+                       }
+
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.BYTE_TYPE_INFO);
+                       }
+
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.SHORT_TYPE_INFO);
+                       }
+
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.INT_TYPE_INFO);
+                       }
+
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.LONG_TYPE_INFO);
+                       }
+
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.FLOAT_TYPE_INFO);
+                       }
+
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.DOUBLE_TYPE_INFO);
+                       }
+
+                       if 
(basicArrayTypeInfo.equals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)) {
+                               elementFieldType = 
buildBasicTypeProto(BasicTypeInfo.CHAR_TYPE_INFO);
+                       }
+

Review comment:
       Maybe you forget add the logic of 
`BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO`?

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -384,6 +438,27 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation):
         else:
             raise TypeError("Invalid element type for a primitive array.")
 
+    @staticmethod
+    def BASIC_ARRAY(element_type: TypeInformation):
+        if element_type == Types.BOOLEAN():
+            return BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO()
+        elif element_type == Types.BYTE():
+            return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO()
+        elif element_type == Types.SHORT():
+            return BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO()
+        elif element_type == Types.INT():
+            return BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO()
+        elif element_type == Types.LONG():
+            return BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO()
+        elif element_type == Types.FLOAT():
+            return BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO()
+        elif element_type == Types.DOUBLE():
+            return BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO()
+        elif element_type == Types.CHAR():
+            return BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO()

Review comment:
       Do you forget to add BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO ? 

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -187,6 +187,24 @@ def __repr__(self):
         return 'ArrayCoderImpl[%s]' % repr(self._elem_coder)
 
 
+class PrimitiveArrayCoderImpl(StreamCoderImpl):

Review comment:
       I think we need to add related unit test of coders in test_coders.py and 
test_fast_coders.py
   

##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -233,6 +233,19 @@ def get_impl(self):
         return coder_impl.ArrayCoderImpl(self._elem_coder.get_impl())
 
 
+class PrimitiveArrayCoder(CollectionCoder):
+    """
+    Coder for Primitive Array.
+    """
+
+    def __init__(self, elem_coder):
+        self._elem_coder = elem_coder

Review comment:
       The parent class `CollectionCoder` has attribute _elem_coder, so we need 
to remove this code. Please change the `ArrayCoder` together
   

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -209,6 +209,60 @@ def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
             .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
 
 
+class BasicArrayTypeInfo(TypeInformation, ABC):
+    """
+    A TypeInformation for arrays boxed primitive types (Integer, Long, Double, 
...).

Review comment:
       ```suggestion
       A TypeInformation for arrays of boxed types (Integer, Long, Double, ...).
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -384,6 +438,27 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation):
         else:
             raise TypeError("Invalid element type for a primitive array.")
 
+    @staticmethod
+    def BASIC_ARRAY(element_type: TypeInformation):

Review comment:
       Adds the annotation

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -384,6 +438,27 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation):
         else:
             raise TypeError("Invalid element type for a primitive array.")
 
+    @staticmethod
+    def BASIC_ARRAY(element_type: TypeInformation):
+        if element_type == Types.BOOLEAN():
+            return BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO()
+        elif element_type == Types.BYTE():
+            return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO()
+        elif element_type == Types.SHORT():
+            return BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO()
+        elif element_type == Types.INT():
+            return BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO()
+        elif element_type == Types.LONG():
+            return BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO()
+        elif element_type == Types.FLOAT():
+            return BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO()
+        elif element_type == Types.DOUBLE():
+            return BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO()
+        elif element_type == Types.CHAR():
+            return BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO()
+        else:
+            raise TypeError("Invalid element type for a primitive array.")

Review comment:
       ```suggestion
               raise TypeError("Invalid element type for a boxed array.")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to