HuangXingBo commented on a change in pull request #13327: URL: https://github.com/apache/flink/pull/13327#discussion_r484157316
########## File path: flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java ########## @@ -194,7 +201,54 @@ } FlinkFnApi.TypeInfo.FieldType.Builder builder = FlinkFnApi.TypeInfo.FieldType.newBuilder() - .setTypeName(FlinkFnApi.TypeInfo.TypeName.ARRAY); + .setTypeName(FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY); + builder.setCollectionElementType(elementFieldType); + return builder.build(); + } + + private static FlinkFnApi.TypeInfo.FieldType buildBsicArrayTypeProto( Review comment: ```suggestion private static FlinkFnApi.TypeInfo.FieldType buildBasicArrayTypeProto( ``` ########## File path: flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java ########## @@ -308,6 +363,18 @@ public static TypeSerializer typeInfoSerializerConverter(TypeInformation typeInf .map(f -> typeInfoSerializerConverter(f)).toArray(TypeSerializer[]::new); return new TupleSerializer(Tuple.getTupleClass(tupleTypeInfo.getArity()), fieldTypeSerialzers); } + + if (typeInformation instanceof BasicArrayTypeInfo){ + BasicArrayTypeInfo basicArrayTypeInfo = (BasicArrayTypeInfo) typeInformation; + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)){ Review comment: Maybe we need to add a e2e test for StringArraySerializer. ########## File path: flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java ########## @@ -194,7 +201,54 @@ } FlinkFnApi.TypeInfo.FieldType.Builder builder = FlinkFnApi.TypeInfo.FieldType.newBuilder() - .setTypeName(FlinkFnApi.TypeInfo.TypeName.ARRAY); + .setTypeName(FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY); + builder.setCollectionElementType(elementFieldType); + return builder.build(); + } + + private static FlinkFnApi.TypeInfo.FieldType buildBsicArrayTypeProto( + BasicArrayTypeInfo basicArrayTypeInfo) { + FlinkFnApi.TypeInfo.FieldType elementFieldType = null; + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.BOOLEAN_TYPE_INFO); + } + + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.BYTE_TYPE_INFO); + } + + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.SHORT_TYPE_INFO); + } + + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.INT_TYPE_INFO); + } + + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.LONG_TYPE_INFO); + } + + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.FLOAT_TYPE_INFO); + } + + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.DOUBLE_TYPE_INFO); + } + + if (basicArrayTypeInfo.equals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)) { + elementFieldType = buildBasicTypeProto(BasicTypeInfo.CHAR_TYPE_INFO); + } + Review comment: Maybe you forget add the logic of `BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO`? ########## File path: flink-python/pyflink/common/typeinfo.py ########## @@ -384,6 +438,27 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation): else: raise TypeError("Invalid element type for a primitive array.") + @staticmethod + def BASIC_ARRAY(element_type: TypeInformation): + if element_type == Types.BOOLEAN(): + return BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO() + elif element_type == Types.BYTE(): + return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO() + elif element_type == Types.SHORT(): + return BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO() + elif element_type == Types.INT(): + return BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO() + elif element_type == Types.LONG(): + return BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO() + elif element_type == Types.FLOAT(): + return BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO() + elif element_type == Types.DOUBLE(): + return BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO() + elif element_type == Types.CHAR(): + return BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO() Review comment: Do you forget to add BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO ? ########## File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py ########## @@ -187,6 +187,24 @@ def __repr__(self): return 'ArrayCoderImpl[%s]' % repr(self._elem_coder) +class PrimitiveArrayCoderImpl(StreamCoderImpl): Review comment: I think we need to add related unit test of coders in test_coders.py and test_fast_coders.py ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -233,6 +233,19 @@ def get_impl(self): return coder_impl.ArrayCoderImpl(self._elem_coder.get_impl()) +class PrimitiveArrayCoder(CollectionCoder): + """ + Coder for Primitive Array. + """ + + def __init__(self, elem_coder): + self._elem_coder = elem_coder Review comment: The parent class `CollectionCoder` has attribute _elem_coder, so we need to remove this code. Please change the `ArrayCoder` together ########## File path: flink-python/pyflink/common/typeinfo.py ########## @@ -209,6 +209,60 @@ def CHAR_PRIMITIVE_ARRAY_TYPE_INFO(): .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO) +class BasicArrayTypeInfo(TypeInformation, ABC): + """ + A TypeInformation for arrays boxed primitive types (Integer, Long, Double, ...). Review comment: ```suggestion A TypeInformation for arrays of boxed types (Integer, Long, Double, ...). ``` ########## File path: flink-python/pyflink/common/typeinfo.py ########## @@ -384,6 +438,27 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation): else: raise TypeError("Invalid element type for a primitive array.") + @staticmethod + def BASIC_ARRAY(element_type: TypeInformation): Review comment: Adds the annotation ########## File path: flink-python/pyflink/common/typeinfo.py ########## @@ -384,6 +438,27 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation): else: raise TypeError("Invalid element type for a primitive array.") + @staticmethod + def BASIC_ARRAY(element_type: TypeInformation): + if element_type == Types.BOOLEAN(): + return BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO() + elif element_type == Types.BYTE(): + return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO() + elif element_type == Types.SHORT(): + return BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO() + elif element_type == Types.INT(): + return BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO() + elif element_type == Types.LONG(): + return BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO() + elif element_type == Types.FLOAT(): + return BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO() + elif element_type == Types.DOUBLE(): + return BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO() + elif element_type == Types.CHAR(): + return BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO() + else: + raise TypeError("Invalid element type for a primitive array.") Review comment: ```suggestion raise TypeError("Invalid element type for a boxed array.") ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org