This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f113e4  Revert "[FLINK-17118][python] Add Cython support for 
primitive data types (#11718)"
9f113e4 is described below

commit 9f113e4706f4110142c8aad74912e21bb995e497
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Fri Apr 17 16:25:06 2020 +0200

    Revert "[FLINK-17118][python] Add Cython support for primitive data types 
(#11718)"
    
    This reverts commit 1a5b35b1e1ea79a233cacc88e4574f446aba52ae.
---
 .gitignore                                         |   2 -
 flink-python/MANIFEST.in                           |   2 -
 flink-python/pyflink/fn_execution/coder_impl.py    |   2 +-
 flink-python/pyflink/fn_execution/coders.py        |   2 +-
 .../pyflink/fn_execution/fast_coder_impl.pxd       | 192 -------
 .../pyflink/fn_execution/fast_coder_impl.pyx       | 553 ---------------------
 .../{test_coders.py => test_coders_common.py}      |   9 -
 .../pyflink/fn_execution/tests/test_fast_coders.py | 141 ------
 flink-python/setup.py                              |  28 +-
 flink-python/tox.ini                               |   8 +-
 10 files changed, 5 insertions(+), 934 deletions(-)

diff --git a/.gitignore b/.gitignore
index 886d61b..e89f622 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,8 +36,6 @@ flink-python/dev/.conda/
 flink-python/dev/log/
 flink-python/dev/.stage.txt
 flink-python/.eggs/
-flink-python/**/*.c
-flink-python/**/*.so
 atlassian-ide-plugin.xml
 out/
 /docs/api
diff --git a/flink-python/MANIFEST.in b/flink-python/MANIFEST.in
index edef99d..a8e2d61 100644
--- a/flink-python/MANIFEST.in
+++ b/flink-python/MANIFEST.in
@@ -30,5 +30,3 @@ include pyflink/LICENSE
 include pyflink/NOTICE
 include pyflink/README.txt
 recursive-exclude deps/opt/python *
-recursive-include pyflink/fn_execution *.pxd
-recursive-include pyflink/fn_execution *.pyx
diff --git a/flink-python/pyflink/fn_execution/coder_impl.py 
b/flink-python/pyflink/fn_execution/coder_impl.py
index d4aac4d..743ce7b 100644
--- a/flink-python/pyflink/fn_execution/coder_impl.py
+++ b/flink-python/pyflink/fn_execution/coder_impl.py
@@ -237,7 +237,7 @@ class TinyIntCoderImpl(StreamCoderImpl):
         return struct.unpack('b', in_stream.read(1))[0]
 
 
-class SmallIntCoderImpl(StreamCoderImpl):
+class SmallIntImpl(StreamCoderImpl):
 
     def encode_to_stream(self, value, out_stream, nested):
         out_stream.write(struct.pack('>h', value))
diff --git a/flink-python/pyflink/fn_execution/coders.py 
b/flink-python/pyflink/fn_execution/coders.py
index 4d9bfda..d2be4fd 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -262,7 +262,7 @@ class SmallIntCoder(DeterministicCoder):
     """
 
     def _create_impl(self):
-        return coder_impl.SmallIntCoderImpl()
+        return coder_impl.SmallIntImpl()
 
     def to_type_hint(self):
         return int
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd 
b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
deleted file mode 100644
index 2156247..0000000
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
+++ /dev/null
@@ -1,192 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# cython: language_level=3
-
-cimport libc.stdint
-
-from apache_beam.coders.coder_impl cimport StreamCoderImpl, OutputStream, 
InputStream
-
-# InputStreamAndFunctionWrapper wraps the user-defined function
-# and input_stream_wrapper in operations
-cdef class InputStreamAndFunctionWrapper:
-    # user-defined function
-    cdef readonly object func
-    cdef InputStreamWrapper input_stream_wrapper
-
-# InputStreamWrapper wraps input_stream and related infos used to decode data
-cdef class InputStreamWrapper:
-    cdef InputStream input_stream
-    cdef list input_field_coders
-    cdef TypeName*input_field_type
-    cdef CoderType*input_coder_type
-    cdef libc.stdint.int32_t input_field_count
-    cdef libc.stdint.int32_t input_leading_complete_bytes_num
-    cdef libc.stdint.int32_t input_remaining_bits_num
-    cdef size_t input_buffer_size
-
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
-    cdef readonly StreamCoderImpl _value_coder
-
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
-    # the input field coders and related args used to decode input_stream data
-    cdef list _input_field_coders
-    cdef TypeName*_input_field_type
-    cdef CoderType*_input_coder_type
-    cdef libc.stdint.int32_t _input_field_count
-    cdef libc.stdint.int32_t _input_leading_complete_bytes_num
-    cdef libc.stdint.int32_t _input_remaining_bits_num
-
-    # the output field coders and related args used to encode data to 
output_stream
-    cdef readonly list _output_field_coders
-    cdef TypeName*_output_field_type
-    cdef CoderType*_output_coder_type
-    cdef libc.stdint.int32_t _output_field_count
-    cdef libc.stdint.int32_t _output_leading_complete_bytes_num
-    cdef libc.stdint.int32_t _output_remaining_bits_num
-
-    cdef bint*_null_mask
-    cdef unsigned char*_null_byte_search_table
-
-    # the char pointer used to store encoded data of output_stream
-    cdef char*_output_data
-    cdef size_t _output_buffer_size
-    cdef size_t _output_pos
-
-    # the tmp char pointer used to store encoded data of every row
-    cdef char*_tmp_output_data
-    cdef size_t _tmp_output_buffer_size
-    cdef size_t _tmp_output_pos
-
-    # the char pointer used to map the decoded data of input_stream
-    cdef char*_input_data
-    cdef size_t _input_pos
-    cdef size_t _input_buffer_size
-
-    # used to store the result of Python user-defined function
-    cdef list row
-
-    # the Python user-defined function
-    cdef object func
-
-    # initial attribute
-    cdef void _init_attribute(self)
-
-    # wrap input_stream
-    cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, 
size_t size)
-
-    cdef void _write_null_mask(self, value, libc.stdint.int32_t 
leading_complete_bytes_num,
-                               libc.stdint.int32_t remaining_bits_num)
-    cdef void _read_null_mask(self, bint*null_mask, libc.stdint.int32_t 
leading_complete_bytes_num,
-                              libc.stdint.int32_t remaining_bits_num)
-
-    cdef void _prepare_encode(self, InputStreamAndFunctionWrapper 
input_stream_and_function_wrapper,
-                              OutputStream out_stream)
-
-    cdef void _maybe_flush(self, OutputStream out_stream)
-    # Because output_buffer will be reallocated during encoding data, we need 
to remap output_buffer
-    # to the data pointer of output_stream
-    cdef void _map_output_data_to_output_stream(self, OutputStream out_stream)
-    cdef void _copy_to_output_buffer(self)
-
-    # encode data to output_stream
-    cdef void _encode_one_row(self, value)
-    cdef void _encode_field_simple(self, TypeName field_type, item)
-    cdef void _extend(self, size_t missing)
-    cdef void _encode_byte(self, unsigned char val)
-    cdef void _encode_smallint(self, libc.stdint.int16_t v)
-    cdef void _encode_int(self, libc.stdint.int32_t v)
-    cdef void _encode_bigint(self, libc.stdint.int64_t v)
-    cdef void _encode_float(self, float v)
-    cdef void _encode_double(self, double v)
-    cdef void _encode_bytes(self, char*b)
-
-    # decode data from input_stream
-    cdef void _decode_next_row(self)
-    cdef object _decode_field_simple(self, TypeName field_type)
-    cdef unsigned char _decode_byte(self) except? -1
-    cdef libc.stdint.int16_t _decode_smallint(self) except? -1
-    cdef libc.stdint.int32_t _decode_int(self) except? -1
-    cdef libc.stdint.int64_t _decode_bigint(self) except? -1
-    cdef float _decode_float(self) except? -1
-    cdef double _decode_double(self) except? -1
-    cdef bytes _decode_bytes(self)
-
-cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
-    cdef void _encode_end_message(self)
-
-cdef enum CoderType:
-    UNDEFINED = -1
-    SIMPLE = 0
-    COMPLEX = 1
-
-cdef enum TypeName:
-    NONE = -1
-    ROW = 0
-    TINYINT = 1
-    SMALLINT = 2
-    INT = 3
-    BIGINT = 4
-    DECIMAL = 5
-    FLOAT = 6
-    DOUBLE = 7
-    DATE = 8
-    TIME = 9
-    TIMESTAMP = 10
-    BOOLEAN = 11
-    BINARY = 12
-    CHAR = 13
-    ARRAY = 14
-    MAP = 15
-    LOCAL_ZONED_TIMESTAMP = 16
-
-cdef class BaseCoder:
-    cpdef CoderType coder_type(self)
-    cpdef TypeName type_name(self)
-
-cdef class TinyIntCoderImpl(BaseCoder):
-    pass
-
-cdef class SmallIntCoderImpl(BaseCoder):
-    pass
-
-cdef class IntCoderImpl(BaseCoder):
-    pass
-
-cdef class BigIntCoderImpl(BaseCoder):
-    pass
-
-cdef class BooleanCoderImpl(BaseCoder):
-    pass
-
-cdef class FloatCoderImpl(BaseCoder):
-    pass
-
-cdef class DoubleCoderImpl(BaseCoder):
-    pass
-
-cdef class BinaryCoderImpl(BaseCoder):
-    pass
-
-cdef class CharCoderImpl(BaseCoder):
-    pass
-
-cdef class DateCoderImpl(BaseCoder):
-    pass
-
-cdef class TimeCoderImpl(BaseCoder):
-    pass
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx 
b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
deleted file mode 100644
index bfc08202..0000000
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
+++ /dev/null
@@ -1,553 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# cython: language_level = 3
-# cython: infer_types = True
-# cython: profile=True
-# cython: boundscheck=False, wraparound=False, initializedcheck=False, 
cdivision=True
-
-cimport libc.stdlib
-from libc.string cimport strlen
-
-import datetime
-
-cdef class InputStreamAndFunctionWrapper:
-    def __cinit__(self, func, input_stream_wrapper):
-        self.func = func
-        self.input_stream_wrapper = input_stream_wrapper
-
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
-    def __cinit__(self, value_coder):
-        self._value_coder = value_coder
-
-    cpdef encode_to_stream(self, value, OutputStream out_stream, bint nested):
-        self._value_coder.encode_to_stream(value, out_stream, nested)
-
-    cpdef decode_from_stream(self, InputStream in_stream, bint nested):
-        return self._value_coder.decode_from_stream(in_stream, nested)
-
-    cpdef get_estimated_size_and_observables(self, value, bint nested=False):
-        return 0, []
-
-cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
-    def __init__(self, flatten_row_coder):
-        super(TableFunctionRowCoderImpl, 
self).__init__(flatten_row_coder._output_field_coders)
-
-    cpdef encode_to_stream(self, input_stream_and_function_wrapper, 
OutputStream out_stream,
-                           bint nested):
-        self._prepare_encode(input_stream_and_function_wrapper, out_stream)
-        while self._input_buffer_size > self._input_pos:
-            self._decode_next_row()
-            result = self.func(self.row)
-            if result:
-                for value in result:
-                    if self._output_field_count == 1:
-                        value = (value,)
-                    self._encode_one_row(value)
-                    self._maybe_flush(out_stream)
-            self._encode_end_message()
-
-        self._map_output_data_to_output_stream(out_stream)
-
-    # write 0x00 as end message
-    cdef void _encode_end_message(self):
-        if self._output_buffer_size < self._output_pos + 2:
-            self._extend(2)
-        self._output_data[self._output_pos] = 0x01
-        self._output_data[self._output_pos + 1] = 0x00
-        self._output_pos += 2
-
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
-    def __init__(self, field_coders):
-        self._output_field_coders = field_coders
-        self._output_field_count = len(self._output_field_coders)
-        self._output_field_type = <TypeName*> libc.stdlib.malloc(
-            self._output_field_count * sizeof(TypeName))
-        self._output_coder_type = <CoderType*> libc.stdlib.malloc(
-            self._output_field_count * sizeof(CoderType))
-        self._output_leading_complete_bytes_num = self._output_field_count // 8
-        self._output_remaining_bits_num = self._output_field_count % 8
-        self._tmp_output_buffer_size = 1024
-        self._tmp_output_pos = 0
-        self._tmp_output_data = <char*> 
libc.stdlib.malloc(self._tmp_output_buffer_size)
-        self._null_byte_search_table = <unsigned char*> libc.stdlib.malloc(
-            8 * sizeof(unsigned char))
-        self._init_attribute()
-
-    cpdef decode_from_stream(self, InputStream in_stream, bint nested):
-        cdef InputStreamWrapper input_stream_wrapper
-        input_stream_wrapper = self._wrap_input_stream(in_stream, 
in_stream.size())
-        return input_stream_wrapper
-
-    cpdef encode_to_stream(self, input_stream_and_function_wrapper, 
OutputStream out_stream,
-                           bint nested):
-        cdef list result
-        self._prepare_encode(input_stream_and_function_wrapper, out_stream)
-        while self._input_buffer_size > self._input_pos:
-            self._decode_next_row()
-            result = self.func(self.row)
-            self._encode_one_row(result)
-            self._maybe_flush(out_stream)
-        self._map_output_data_to_output_stream(out_stream)
-
-    cdef void _init_attribute(self):
-        self._null_byte_search_table[0] = 0x80
-        self._null_byte_search_table[1] = 0x40
-        self._null_byte_search_table[2] = 0x20
-        self._null_byte_search_table[3] = 0x10
-        self._null_byte_search_table[4] = 0x08
-        self._null_byte_search_table[5] = 0x04
-        self._null_byte_search_table[6] = 0x02
-        self._null_byte_search_table[7] = 0x01
-        for i in range(self._output_field_count):
-            self._output_field_type[i] = 
self._output_field_coders[i].type_name()
-            self._output_coder_type[i] = 
self._output_field_coders[i].coder_type()
-
-    cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, 
size_t size):
-        # wrappers the input field coders and input_stream together
-        # so that it can be transposed to operations
-        cdef InputStreamWrapper input_stream_wrapper
-        input_stream_wrapper = InputStreamWrapper()
-        input_stream_wrapper.input_stream = input_stream
-        input_stream_wrapper.input_field_coders = self._output_field_coders
-        input_stream_wrapper.input_remaining_bits_num = 
self._output_remaining_bits_num
-        input_stream_wrapper.input_leading_complete_bytes_num = \
-            self._output_leading_complete_bytes_num
-        input_stream_wrapper.input_field_count = self._output_field_count
-        input_stream_wrapper.input_field_type = self._output_field_type
-        input_stream_wrapper.input_coder_type = self._output_coder_type
-        input_stream_wrapper.input_stream.pos = size
-        input_stream_wrapper.input_buffer_size = size
-        return input_stream_wrapper
-
-    cdef void _encode_one_row(self, value):
-        cdef libc.stdint.int32_t i
-        self._write_null_mask(value, self._output_leading_complete_bytes_num,
-                              self._output_remaining_bits_num)
-        for i in range(self._output_field_count):
-            item = value[i]
-            if item is not None:
-                if self._output_coder_type[i] == SIMPLE:
-                    self._encode_field_simple(self._output_field_type[i], item)
-
-        self._copy_to_output_buffer()
-
-    cdef void _read_null_mask(self, bint*null_mask,
-                              libc.stdint.int32_t 
input_leading_complete_bytes_num,
-                              libc.stdint.int32_t input_remaining_bits_num):
-        cdef libc.stdint.int32_t field_pos, i
-        cdef unsigned char b
-        field_pos = 0
-        for _ in range(input_leading_complete_bytes_num):
-            b = self._input_data[self._input_pos]
-            self._input_pos += 1
-            for i in range(8):
-                null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0
-                field_pos += 1
-
-        if input_remaining_bits_num:
-            b = self._input_data[self._input_pos]
-            self._input_pos += 1
-            for i in range(input_remaining_bits_num):
-                null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0
-                field_pos += 1
-
-    cdef void _decode_next_row(self):
-        cdef libc.stdint.int32_t i
-        # skip prefix variable int length
-        while self._input_data[self._input_pos] & 0x80:
-            self._input_pos += 1
-        self._input_pos += 1
-        self._read_null_mask(self._null_mask, 
self._input_leading_complete_bytes_num,
-                             self._input_remaining_bits_num)
-        for i in range(self._input_field_count):
-            if self._null_mask[i]:
-                self.row[i] = None
-            else:
-                if self._input_coder_type[i] == SIMPLE:
-                    self.row[i] = 
self._decode_field_simple(self._input_field_type[i])
-
-    cdef object _decode_field_simple(self, TypeName field_type):
-        cdef libc.stdint.int32_t value, minutes, seconds, hours
-        cdef libc.stdint.int64_t milliseconds
-        if field_type == TINYINT:
-            # tinyint
-            return self._decode_byte()
-        elif field_type == SMALLINT:
-            # smallint
-            return self._decode_smallint()
-        elif field_type == INT:
-            # int
-            return self._decode_int()
-        elif field_type == BIGINT:
-            # bigint
-            return self._decode_bigint()
-        elif field_type == BOOLEAN:
-            # boolean
-            return not not self._decode_byte()
-        elif field_type == FLOAT:
-            # float
-            return self._decode_float()
-        elif field_type == DOUBLE:
-            # double
-            return self._decode_double()
-        elif field_type == BINARY:
-            # bytes
-            return self._decode_bytes()
-        elif field_type == CHAR:
-            # str
-            return self._decode_bytes().decode("utf-8")
-        elif field_type == DATE:
-            # Date
-            # EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
-            # The value of EPOCH_ORDINAL is 719163
-            return datetime.date.fromordinal(self._decode_int() + 719163)
-        elif field_type == TIME:
-            # Time
-            value = self._decode_int()
-            seconds = value // 1000
-            milliseconds = value % 1000
-            minutes = seconds // 60
-            seconds %= 60
-            hours = minutes // 60
-            minutes %= 60
-            return datetime.time(hours, minutes, seconds, milliseconds * 1000)
-
-    cdef unsigned char _decode_byte(self) except? -1:
-        self._input_pos += 1
-        return <unsigned char> self._input_data[self._input_pos - 1]
-
-    cdef libc.stdint.int16_t _decode_smallint(self) except? -1:
-        self._input_pos += 2
-        return (<unsigned char> self._input_data[self._input_pos - 1]
-                | <libc.stdint.uint32_t> <unsigned char> 
self._input_data[self._input_pos - 2] << 8)
-
-    cdef libc.stdint.int32_t _decode_int(self) except? -1:
-        self._input_pos += 4
-        return (<unsigned char> self._input_data[self._input_pos - 1]
-                | <libc.stdint.uint32_t> <unsigned char> 
self._input_data[self._input_pos - 2] << 8
-                | <libc.stdint.uint32_t> <unsigned char> 
self._input_data[self._input_pos - 3] << 16
-                | <libc.stdint.uint32_t> <unsigned char> self._input_data[
-                    self._input_pos - 4] << 24)
-
-    cdef libc.stdint.int64_t _decode_bigint(self) except? -1:
-        self._input_pos += 8
-        return (<unsigned char> self._input_data[self._input_pos - 1]
-                | <libc.stdint.uint64_t> <unsigned char> 
self._input_data[self._input_pos - 2] << 8
-                | <libc.stdint.uint64_t> <unsigned char> 
self._input_data[self._input_pos - 3] << 16
-                | <libc.stdint.uint64_t> <unsigned char> 
self._input_data[self._input_pos - 4] << 24
-                | <libc.stdint.uint64_t> <unsigned char> 
self._input_data[self._input_pos - 5] << 32
-                | <libc.stdint.uint64_t> <unsigned char> 
self._input_data[self._input_pos - 6] << 40
-                | <libc.stdint.uint64_t> <unsigned char> 
self._input_data[self._input_pos - 7] << 48
-                | <libc.stdint.uint64_t> <unsigned char> self._input_data[
-                    self._input_pos - 8] << 56)
-
-    cdef float _decode_float(self) except? -1:
-        cdef libc.stdint.int32_t as_long = self._decode_int()
-        return (<float*> <char*> &as_long)[0]
-
-    cdef double _decode_double(self) except? -1:
-        cdef libc.stdint.int64_t as_long = self._decode_bigint()
-        return (<double*> <char*> &as_long)[0]
-
-    cdef bytes _decode_bytes(self):
-        cdef libc.stdint.int32_t size = self._decode_int()
-        self._input_pos += size
-        return self._input_data[self._input_pos - size: self._input_pos]
-
-    cdef void _prepare_encode(self, InputStreamAndFunctionWrapper 
input_stream_and_function_wrapper,
-                              OutputStream out_stream):
-        cdef InputStreamWrapper input_stream_wrapper
-        # get the data pointer of output_stream
-        self._output_data = out_stream.data
-        self._output_pos = out_stream.pos
-        self._output_buffer_size = out_stream.buffer_size
-        self._tmp_output_pos = 0
-
-        input_stream_wrapper = 
input_stream_and_function_wrapper.input_stream_wrapper
-        # get the data pointer of input_stream
-        self._input_data = input_stream_wrapper.input_stream.allc
-        self._input_buffer_size = input_stream_wrapper.input_buffer_size
-
-        # get the infos of input coder which will be used to decode data from 
input_stream
-        self._input_field_count = input_stream_wrapper.input_field_count
-        self._input_leading_complete_bytes_num = 
input_stream_wrapper.input_leading_complete_bytes_num
-        self._input_remaining_bits_num = 
input_stream_wrapper.input_remaining_bits_num
-        self._input_field_type = input_stream_wrapper.input_field_type
-        self._input_coder_type = input_stream_wrapper.input_coder_type
-        self._input_field_coders = input_stream_wrapper.input_field_coders
-        self._null_mask = <bint*> libc.stdlib.malloc(self._input_field_count * 
sizeof(bint))
-        self._input_pos = 0
-
-        # initial the result row and get the Python user-defined function
-        self.row = [None for _ in range(self._input_field_count)]
-        self.func = input_stream_and_function_wrapper.func
-
-    cdef void _encode_field_simple(self, TypeName field_type, item):
-        cdef libc.stdint.int32_t hour, minute, seconds, microsecond, 
milliseconds
-        if field_type == TINYINT:
-            # tinyint
-            self._encode_byte(item)
-        elif field_type == SMALLINT:
-            # smallint
-            self._encode_smallint(item)
-        elif field_type == INT:
-            # int
-            self._encode_int(item)
-        elif field_type == BIGINT:
-            # bigint
-            self._encode_bigint(item)
-        elif field_type == BOOLEAN:
-            # boolean
-            self._encode_byte(item)
-        elif field_type == FLOAT:
-            # float
-            self._encode_float(item)
-        elif field_type == DOUBLE:
-            # double
-            self._encode_double(item)
-        elif field_type == BINARY:
-            # bytes
-            self._encode_bytes(item)
-        elif field_type == CHAR:
-            # str
-            self._encode_bytes(item.encode('utf-8'))
-        elif field_type == DATE:
-            # Date
-            # EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
-            # The value of EPOCH_ORDINAL is 719163
-            self._encode_int(item.toordinal() - 719163)
-        elif field_type == TIME:
-            # Time
-            hour = item.hour
-            minute = item.minute
-            seconds = item.second
-            microsecond = item.microsecond
-            milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 + 
microsecond // 1000
-            self._encode_int(milliseconds)
-
-    cdef void _copy_to_output_buffer(self):
-        cdef size_t size
-        cdef size_t i
-        cdef bint is_realloc
-        cdef char bits
-        # the length of the variable prefix length will be less than 9 bytes
-        if self._output_buffer_size < self._output_pos + self._tmp_output_pos 
+ 9:
-            self._output_buffer_size += self._tmp_output_buffer_size + 9
-            self._output_data = <char*> libc.stdlib.realloc(self._output_data,
-                                                            
self._output_buffer_size)
-        size = self._tmp_output_pos
-        # write variable prefix length
-        while size:
-            bits = size & 0x7F
-            size >>= 7
-            if size:
-                bits |= 0x80
-            self._output_data[self._output_pos] = bits
-            self._output_pos += 1
-        if self._tmp_output_pos < 8:
-            # This is faster than memcpy when the string is short.
-            for i in range(self._tmp_output_pos):
-                self._output_data[self._output_pos + i] = 
self._tmp_output_data[i]
-        else:
-            libc.string.memcpy(self._output_data + self._output_pos, 
self._tmp_output_data,
-                               self._tmp_output_pos)
-        self._output_pos += self._tmp_output_pos
-        self._tmp_output_pos = 0
-
-    cdef void _maybe_flush(self, OutputStream out_stream):
-        # Currently, it will trigger flushing when the size of buffer reach to 
10_000_000
-        if self._output_pos > 10_000_000:
-            self._map_output_data_to_output_stream(out_stream)
-            out_stream.flush()
-            self._output_pos = 0
-
-    cdef void _map_output_data_to_output_stream(self, OutputStream out_stream):
-        out_stream.data = self._output_data
-        out_stream.pos = self._output_pos
-        out_stream.buffer_size = self._output_buffer_size
-
-    cdef void _extend(self, size_t missing):
-        while self._tmp_output_buffer_size < self._tmp_output_pos + missing:
-            self._tmp_output_buffer_size *= 2
-        self._tmp_output_data = <char*> 
libc.stdlib.realloc(self._tmp_output_data,
-                                                            
self._tmp_output_buffer_size)
-
-    cdef void _encode_byte(self, unsigned char val):
-        if self._tmp_output_buffer_size < self._tmp_output_pos + 1:
-            self._extend(1)
-        self._tmp_output_data[self._tmp_output_pos] = val
-        self._tmp_output_pos += 1
-
-    cdef void _encode_smallint(self, libc.stdint.int16_t v):
-        if self._tmp_output_buffer_size < self._tmp_output_pos + 2:
-            self._extend(2)
-        self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 8)
-        self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> v
-        self._tmp_output_pos += 2
-
-    cdef void _encode_int(self, libc.stdint.int32_t v):
-        if self._tmp_output_buffer_size < self._tmp_output_pos + 4:
-            self._extend(4)
-        self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 24)
-        self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> (v 
>> 16)
-        self._tmp_output_data[self._tmp_output_pos + 2] = <unsigned char> (v 
>> 8)
-        self._tmp_output_data[self._tmp_output_pos + 3] = <unsigned char> v
-        self._tmp_output_pos += 4
-
-    cdef void _encode_bigint(self, libc.stdint.int64_t v):
-        if self._tmp_output_buffer_size < self._tmp_output_pos + 8:
-            self._extend(8)
-        self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 56)
-        self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> (v 
>> 48)
-        self._tmp_output_data[self._tmp_output_pos + 2] = <unsigned char> (v 
>> 40)
-        self._tmp_output_data[self._tmp_output_pos + 3] = <unsigned char> (v 
>> 32)
-        self._tmp_output_data[self._tmp_output_pos + 4] = <unsigned char> (v 
>> 24)
-        self._tmp_output_data[self._tmp_output_pos + 5] = <unsigned char> (v 
>> 16)
-        self._tmp_output_data[self._tmp_output_pos + 6] = <unsigned char> (v 
>> 8)
-        self._tmp_output_data[self._tmp_output_pos + 7] = <unsigned char> v
-        self._tmp_output_pos += 8
-
-    cdef void _encode_float(self, float v):
-        self._encode_int((<libc.stdint.int32_t*> <char*> &v)[0])
-
-    cdef void _encode_double(self, double v):
-        self._encode_bigint((<libc.stdint.int64_t*> <char*> &v)[0])
-
-    cdef void _encode_bytes(self, char*b):
-        cdef libc.stdint.int32_t length = strlen(b)
-        self._encode_int(length)
-        if self._tmp_output_buffer_size < self._tmp_output_pos + length:
-            self._extend(length)
-        if length < 8:
-            # This is faster than memcpy when the string is short.
-            for i in range(length):
-                self._tmp_output_data[self._tmp_output_pos + i] = b[i]
-        else:
-            libc.string.memcpy(self._tmp_output_data + self._tmp_output_pos, 
b, length)
-        self._tmp_output_pos += length
-
-    cdef void _write_null_mask(self, value, libc.stdint.int32_t 
leading_complete_bytes_num,
-                               libc.stdint.int32_t remaining_bits_num):
-        cdef libc.stdint.int32_t field_pos, index
-        cdef unsigned char*null_byte_search_table
-        cdef unsigned char b, i
-        field_pos = 0
-        null_byte_search_table = self._null_byte_search_table
-        for _ in range(leading_complete_bytes_num):
-            b = 0x00
-            for i in range(8):
-                if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
-            field_pos += 8
-            self._encode_byte(b)
-
-        if remaining_bits_num:
-            b = 0x00
-            for i in range(remaining_bits_num):
-                if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
-            self._encode_byte(b)
-
-    def __dealloc__(self):
-        if self.null_mask:
-            libc.stdlib.free(self._null_mask)
-        if self.null_byte_search_table:
-            libc.stdlib.free(self._null_byte_search_table)
-        if self._tmp_output_data:
-            libc.stdlib.free(self._tmp_output_data)
-        if self._output_field_type:
-            libc.stdlib.free(self._output_field_type)
-        if self._output_coder_type:
-            libc.stdlib.free(self._output_coder_type)
-
-cdef class BaseCoder:
-    cpdef CoderType coder_type(self):
-        return UNDEFINED
-
-    cpdef TypeName type_name(self):
-        return NONE
-
-cdef class TinyIntCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-
-    cpdef TypeName type_name(self):
-        return TINYINT
-
-cdef class SmallIntCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-    cpdef TypeName type_name(self):
-        return SMALLINT
-
-cdef class IntCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-
-    cpdef TypeName type_name(self):
-        return INT
-
-cdef class BigIntCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-    cpdef TypeName type_name(self):
-        return BIGINT
-
-cdef class BooleanCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-    cpdef TypeName type_name(self):
-        return BOOLEAN
-
-cdef class FloatCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-    cpdef TypeName type_name(self):
-        return FLOAT
-
-cdef class DoubleCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-    cpdef TypeName type_name(self):
-        return DOUBLE
-
-cdef class BinaryCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-    cpdef TypeName type_name(self):
-        return BINARY
-
-cdef class CharCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-    cpdef TypeName type_name(self):
-        return CHAR
-
-cdef class DateCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-
-    cpdef TypeName type_name(self):
-        return DATE
-
-cdef class TimeCoderImpl(BaseCoder):
-    cpdef CoderType coder_type(self):
-        return SIMPLE
-
-    cpdef TypeName type_name(self):
-        return TIME
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py 
b/flink-python/pyflink/fn_execution/tests/test_coders_common.py
similarity index 95%
rename from flink-python/pyflink/fn_execution/tests/test_coders.py
rename to flink-python/pyflink/fn_execution/tests/test_coders_common.py
index 3134e83..69e9961 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders_common.py
@@ -25,16 +25,7 @@ from pyflink.fn_execution.coders import BigIntCoder, 
TinyIntCoder, BooleanCoder,
     TimeCoder, TimestampCoder, ArrayCoder, MapCoder, DecimalCoder, 
FlattenRowCoder, RowCoder, \
     LocalZonedTimestampCoder
 
-try:
-    from pyflink.fn_execution import fast_coder_impl  # noqa # pylint: 
disable=unused-import
 
-    have_cython = True
-except ImportError:
-    have_cython = False
-
-
-@unittest.skipIf(have_cython,
-                 "Found cython implementation, we don't need to test 
non-compiled implementation")
 class CodersTest(unittest.TestCase):
 
     def check_coder(self, coder, *values):
diff --git a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py 
b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
deleted file mode 100644
index 15a8ab0..0000000
--- a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
+++ /dev/null
@@ -1,141 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""Tests common to all coder implementations."""
-import logging
-import unittest
-
-from pyflink.fn_execution import coder_impl
-
-try:
-    from pyflink.fn_execution import fast_coder_impl
-
-    have_cython = True
-except ImportError:
-    have_cython = False
-
-
-@unittest.skipUnless(have_cython, "Uncompiled Cython Coder")
-class CodersTest(unittest.TestCase):
-
-    def check_cython_coder(self, python_field_coders, cython_field_coders, 
data):
-        from apache_beam.coders.coder_impl import create_InputStream, 
create_OutputStream
-        from pyflink.fn_execution.fast_coder_impl import 
InputStreamAndFunctionWrapper
-        py_flatten_row_coder = 
coder_impl.FlattenRowCoderImpl(python_field_coders)
-        internal = py_flatten_row_coder.encode(data)
-        input_stream = create_InputStream(internal)
-        output_stream = create_OutputStream()
-        cy_flatten_row_coder = 
fast_coder_impl.FlattenRowCoderImpl(cython_field_coders)
-        value = cy_flatten_row_coder.decode_from_stream(input_stream, False)
-        wrapper_func_input_element = InputStreamAndFunctionWrapper(
-            lambda v: [v[i] for i in range(len(v))], value)
-        cy_flatten_row_coder.encode_to_stream(wrapper_func_input_element, 
output_stream, False)
-        generator_result = 
py_flatten_row_coder.decode_from_stream(create_InputStream(
-            output_stream.get()), False)
-        result = []
-        for item in generator_result:
-            result.append(item)
-        try:
-            self.assertEqual(result, data)
-        except AssertionError:
-            self.assertEqual(len(result), len(data))
-            self.assertEqual(len(result[0]), len(data[0]))
-            for i in range(len(data[0])):
-                if isinstance(data[0][i], float):
-                    from pyflink.table.tests.test_udf import float_equal
-                    assert float_equal(data[0][i], result[0][i], 1e-6)
-                else:
-                    self.assertEqual(data[0][i], result[0][i])
-
-    # decide whether two floats are equal
-    @staticmethod
-    def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):
-        return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
-
-    def test_cython_bigint_coder(self):
-        data = [1, 100, -100, -1000]
-        python_field_coders = [coder_impl.BigIntCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.BigIntCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_tinyint_coder(self):
-        data = [1, 10, 127, -128]
-        python_field_coders = [coder_impl.TinyIntCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.TinyIntCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_boolean_coder(self):
-        data = [True, False]
-        python_field_coders = [coder_impl.BooleanCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.BooleanCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_smallint_coder(self):
-        data = [32767, -32768, 0]
-        python_field_coders = [coder_impl.SmallIntCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.SmallIntCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_int_coder(self):
-        data = [-2147483648, 2147483647]
-        python_field_coders = [coder_impl.IntCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.IntCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_float_coder(self):
-        data = [1.02, 1.32]
-        python_field_coders = [coder_impl.FloatCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.FloatCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_double_coder(self):
-        data = [-12.02, 1.98932]
-        python_field_coders = [coder_impl.DoubleCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.DoubleCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_binary_coder(self):
-        data = [b'pyflink']
-        python_field_coders = [coder_impl.BinaryCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.BinaryCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_char_coder(self):
-        data = ['flink', '🐿']
-        python_field_coders = [coder_impl.CharCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.CharCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_date_coder(self):
-        import datetime
-        data = [datetime.date(2019, 9, 10)]
-        python_field_coders = [coder_impl.DateCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.DateCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-    def test_cython_time_coder(self):
-        import datetime
-        data = [datetime.time(hour=11, minute=11, second=11, 
microsecond=123000)]
-        python_field_coders = [coder_impl.TimeCoderImpl() for _ in 
range(len(data))]
-        cython_field_coders = [fast_coder_impl.TimeCoderImpl() for _ in 
range(len(data))]
-        self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
-
-
-if __name__ == '__main__':
-    logging.getLogger().setLevel(logging.INFO)
-    unittest.main()
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 4fa842e..c74abeb 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -20,10 +20,9 @@ from __future__ import print_function
 import io
 import os
 import sys
-from distutils.command.build_ext import build_ext
 from shutil import copytree, copy, rmtree
 
-from setuptools import setup, Extension
+from setuptools import setup
 
 if sys.version_info < (3, 5):
     print("Python versions prior to 3.5 are not supported for PyFlink.",
@@ -40,26 +39,6 @@ def remove_if_exists(file_path):
             rmtree(file_path)
 
 
-try:
-    from Cython.Build import cythonize
-    extensions = cythonize([
-        Extension(
-            name="pyflink.fn_execution.fast_coder_impl",
-            sources=["pyflink/fn_execution/fast_coder_impl.pyx"],
-            include_dirs=["pyflink/fn_execution/"])
-    ])
-except ImportError:
-    if os.path.exists("pyflink/fn_execution/fast_coder_impl.c"):
-        extensions = ([
-            Extension(
-                name="pyflink.fn_execution.fast_coder_impl",
-                sources=["pyflink/fn_execution/fast_coder_impl.c"],
-                include_dirs=["pyflink/fn_execution/"])
-        ])
-    else:
-        extensions = ([])
-
-
 this_directory = os.path.abspath(os.path.dirname(__file__))
 version_file = os.path.join(this_directory, 'pyflink/version.py')
 
@@ -253,19 +232,16 @@ run sdist.
         install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 
'apache-beam==2.19.0',
                           'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 
'jsonpickle==1.2',
                           'pandas>=0.23.4,<=0.25.3', 
'pyarrow>=0.15.1,<0.16.0', 'pytz>=2018.3'],
-        cmdclass={'build_ext': build_ext},
         tests_require=['pytest==4.4.1'],
         description='Apache Flink Python API',
         long_description=long_description,
         long_description_content_type='text/markdown',
-        zip_safe=False,
         classifiers=[
             'Development Status :: 5 - Production/Stable',
             'License :: OSI Approved :: Apache Software License',
             'Programming Language :: Python :: 3.5',
             'Programming Language :: Python :: 3.6',
-            'Programming Language :: Python :: 3.7'],
-        ext_modules=extensions
+            'Programming Language :: Python :: 3.7']
     )
 finally:
     if in_flink_source:
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 2742ed2..b944857 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -21,23 +21,17 @@
 # in multiple virtualenvs. This configuration file will run the
 # test suite on all supported python versions.
 # new environments will be excluded by default unless explicitly added to 
envlist.
-envlist = {py35, py36, py37}-cython
+envlist = py35, py36, py37
 
 [testenv]
 whitelist_externals=
     /bin/bash
 deps =
     pytest
-    apache-beam==2.19.0
-    cython==0.28.1
     grpcio>=1.17.0,<=1.26.0
     grpcio-tools>=1.3.5,<=1.14.2
 commands =
     python --version
-    # python test
-    pytest --durations=0
-    python setup.py build_ext --inplace
-    # cython test
     pytest --durations=0
     bash ./dev/run_pip_test.sh
 # Replace the default installation command with a custom retry installation 
script, because on high-speed

Reply via email to