dianfu commented on a change in pull request #9977: [FLINK-14497][python] 
Support primitive data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/9977#discussion_r338915147
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/coders.py
 ##########
 @@ -62,21 +67,183 @@ def __hash__(self):
         return hash(self._field_coders)
 
 
+class DeterministicCoder(FastCoder):
+    """
+    Base Coder for all deterministic Coders.
+    """
+    def _create_impl(self):
+        raise NotImplementedError
+
+    def to_type_hint(self):
+        return typing.Any
+
+    def is_deterministic(self):
+        return True
+
+
+class LongCoder(DeterministicCoder):
+    """
+    Coder for 8 bytes long.
+    """
+
+    def _create_impl(self):
+        return coder_impl.LongCoderImpl()
+
+    def to_type_hint(self):
+        return int
+
+
+class ByteCoder(DeterministicCoder):
+    """
+    Coder for Byte.
+    """
+
+    def _create_impl(self):
+        return coder_impl.ByteCoderImpl()
+
+    def to_type_hint(self):
+        return int
+
+
+class BooleanCoder(DeterministicCoder):
+    """
+    Coder for Boolean.
+    """
+
+    def _create_impl(self):
+        return coder_impl.BooleanCoderImpl()
+
+    def to_type_hint(self):
+        return bool
+
+
+class IntCoder(DeterministicCoder):
+    """
+    Coder for 4 bytes int.
+    """
+
+    def _create_impl(self):
+        return coder_impl.IntCoderImpl()
+
+    def to_type_hint(self):
+        return int
+
+
+class ShortCoder(DeterministicCoder):
+    """
+    Coder for Short.
+    """
+
+    def _create_impl(self):
+        return coder_impl.ShortCoderImpl()
+
+    def to_type_hint(self):
+        return int
+
+
+class FloatCoder(DeterministicCoder):
+    """
+    Coder for Float.
+    """
+
+    def _create_impl(self):
+        return coder_impl.FloatCoderImpl()
+
+    def to_type_hint(self):
+        return float
+
+
+class DoubleCoder(DeterministicCoder):
+    """
+    Coder for Double.
+    """
+
+    def _create_impl(self):
+        return coder_impl.DoubleCoderImpl()
+
+    def to_type_hint(self):
+        return float
+
+
+class BinaryCoder(DeterministicCoder):
+    """
+    Coder for Byte Array.
+    """
+
+    def _create_impl(self):
+        return coder_impl.BinaryCoderImpl()
+
+    def to_type_hint(self):
+        return bytes
+
+
+class StringCoder(DeterministicCoder):
+    """
+    Coder for Character String.
+    """
+    def _create_impl(self):
+        return coder_impl.StringCoderImpl()
+
+    def to_type_hint(self):
+        return str
+
+
+class DateCoder(DeterministicCoder):
+    """
+    Coder for Date
+    """
+
+    def _create_impl(self):
+        return coder_impl.DateCoderImpl()
+
+    def to_type_hint(self):
+        return datetime.date
+
+
 @Coder.register_urn(FLINK_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
 def _pickle_from_runner_api_parameter(schema_proto, unused_components, 
unused_context):
     return RowCoder([from_proto(f.type) for f in schema_proto.fields])
 
 
+type_name = flink_fn_execution_pb2.Schema.TypeName
+BOOLEAN_CODER = BooleanCoder()
+BYTE_CODER = ByteCoder()
+SHORT_CODER = ShortCoder()
+INT_CODER = IntCoder()
+BIGINT_CODER = LongCoder()
+FLOAT_CODER = FloatCoder()
+DOUBLE_CODER = DoubleCoder()
 
 Review comment:
   Could we remove these variables and create the instances in the 
_type_name_mappings directly?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to