This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9f113e4 Revert "[FLINK-17118][python] Add Cython support for primitive data types (#11718)" 9f113e4 is described below commit 9f113e4706f4110142c8aad74912e21bb995e497 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Fri Apr 17 16:25:06 2020 +0200 Revert "[FLINK-17118][python] Add Cython support for primitive data types (#11718)" This reverts commit 1a5b35b1e1ea79a233cacc88e4574f446aba52ae. --- .gitignore | 2 - flink-python/MANIFEST.in | 2 - flink-python/pyflink/fn_execution/coder_impl.py | 2 +- flink-python/pyflink/fn_execution/coders.py | 2 +- .../pyflink/fn_execution/fast_coder_impl.pxd | 192 ------- .../pyflink/fn_execution/fast_coder_impl.pyx | 553 --------------------- .../{test_coders.py => test_coders_common.py} | 9 - .../pyflink/fn_execution/tests/test_fast_coders.py | 141 ------ flink-python/setup.py | 28 +- flink-python/tox.ini | 8 +- 10 files changed, 5 insertions(+), 934 deletions(-) diff --git a/.gitignore b/.gitignore index 886d61b..e89f622 100644 --- a/.gitignore +++ b/.gitignore @@ -36,8 +36,6 @@ flink-python/dev/.conda/ flink-python/dev/log/ flink-python/dev/.stage.txt flink-python/.eggs/ -flink-python/**/*.c -flink-python/**/*.so atlassian-ide-plugin.xml out/ /docs/api diff --git a/flink-python/MANIFEST.in b/flink-python/MANIFEST.in index edef99d..a8e2d61 100644 --- a/flink-python/MANIFEST.in +++ b/flink-python/MANIFEST.in @@ -30,5 +30,3 @@ include pyflink/LICENSE include pyflink/NOTICE include pyflink/README.txt recursive-exclude deps/opt/python * -recursive-include pyflink/fn_execution *.pxd -recursive-include pyflink/fn_execution *.pyx diff --git a/flink-python/pyflink/fn_execution/coder_impl.py b/flink-python/pyflink/fn_execution/coder_impl.py index d4aac4d..743ce7b 100644 --- a/flink-python/pyflink/fn_execution/coder_impl.py +++ b/flink-python/pyflink/fn_execution/coder_impl.py @@ -237,7 +237,7 @@ class TinyIntCoderImpl(StreamCoderImpl): return struct.unpack('b', in_stream.read(1))[0] -class SmallIntCoderImpl(StreamCoderImpl): +class SmallIntImpl(StreamCoderImpl): def encode_to_stream(self, value, out_stream, nested): out_stream.write(struct.pack('>h', value)) diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py index 4d9bfda..d2be4fd 100644 --- a/flink-python/pyflink/fn_execution/coders.py +++ b/flink-python/pyflink/fn_execution/coders.py @@ -262,7 +262,7 @@ class SmallIntCoder(DeterministicCoder): """ def _create_impl(self): - return coder_impl.SmallIntCoderImpl() + return coder_impl.SmallIntImpl() def to_type_hint(self): return int diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd deleted file mode 100644 index 2156247..0000000 --- a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd +++ /dev/null @@ -1,192 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -# cython: language_level=3 - -cimport libc.stdint - -from apache_beam.coders.coder_impl cimport StreamCoderImpl, OutputStream, InputStream - -# InputStreamAndFunctionWrapper wraps the user-defined function -# and input_stream_wrapper in operations -cdef class InputStreamAndFunctionWrapper: - # user-defined function - cdef readonly object func - cdef InputStreamWrapper input_stream_wrapper - -# InputStreamWrapper wraps input_stream and related infos used to decode data -cdef class InputStreamWrapper: - cdef InputStream input_stream - cdef list input_field_coders - cdef TypeName*input_field_type - cdef CoderType*input_coder_type - cdef libc.stdint.int32_t input_field_count - cdef libc.stdint.int32_t input_leading_complete_bytes_num - cdef libc.stdint.int32_t input_remaining_bits_num - cdef size_t input_buffer_size - -cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl): - cdef readonly StreamCoderImpl _value_coder - -cdef class FlattenRowCoderImpl(StreamCoderImpl): - # the input field coders and related args used to decode input_stream data - cdef list _input_field_coders - cdef TypeName*_input_field_type - cdef CoderType*_input_coder_type - cdef libc.stdint.int32_t _input_field_count - cdef libc.stdint.int32_t _input_leading_complete_bytes_num - cdef libc.stdint.int32_t _input_remaining_bits_num - - # the output field coders and related args used to encode data to output_stream - cdef readonly list _output_field_coders - cdef TypeName*_output_field_type - cdef CoderType*_output_coder_type - cdef libc.stdint.int32_t _output_field_count - cdef libc.stdint.int32_t _output_leading_complete_bytes_num - cdef libc.stdint.int32_t _output_remaining_bits_num - - cdef bint*_null_mask - cdef unsigned char*_null_byte_search_table - - # the char pointer used to store encoded data of output_stream - cdef char*_output_data - cdef size_t _output_buffer_size - cdef size_t _output_pos - - # the tmp char pointer used to store encoded data of every row - cdef char*_tmp_output_data - cdef size_t _tmp_output_buffer_size - cdef size_t _tmp_output_pos - - # the char pointer used to map the decoded data of input_stream - cdef char*_input_data - cdef size_t _input_pos - cdef size_t _input_buffer_size - - # used to store the result of Python user-defined function - cdef list row - - # the Python user-defined function - cdef object func - - # initial attribute - cdef void _init_attribute(self) - - # wrap input_stream - cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, size_t size) - - cdef void _write_null_mask(self, value, libc.stdint.int32_t leading_complete_bytes_num, - libc.stdint.int32_t remaining_bits_num) - cdef void _read_null_mask(self, bint*null_mask, libc.stdint.int32_t leading_complete_bytes_num, - libc.stdint.int32_t remaining_bits_num) - - cdef void _prepare_encode(self, InputStreamAndFunctionWrapper input_stream_and_function_wrapper, - OutputStream out_stream) - - cdef void _maybe_flush(self, OutputStream out_stream) - # Because output_buffer will be reallocated during encoding data, we need to remap output_buffer - # to the data pointer of output_stream - cdef void _map_output_data_to_output_stream(self, OutputStream out_stream) - cdef void _copy_to_output_buffer(self) - - # encode data to output_stream - cdef void _encode_one_row(self, value) - cdef void _encode_field_simple(self, TypeName field_type, item) - cdef void _extend(self, size_t missing) - cdef void _encode_byte(self, unsigned char val) - cdef void _encode_smallint(self, libc.stdint.int16_t v) - cdef void _encode_int(self, libc.stdint.int32_t v) - cdef void _encode_bigint(self, libc.stdint.int64_t v) - cdef void _encode_float(self, float v) - cdef void _encode_double(self, double v) - cdef void _encode_bytes(self, char*b) - - # decode data from input_stream - cdef void _decode_next_row(self) - cdef object _decode_field_simple(self, TypeName field_type) - cdef unsigned char _decode_byte(self) except? -1 - cdef libc.stdint.int16_t _decode_smallint(self) except? -1 - cdef libc.stdint.int32_t _decode_int(self) except? -1 - cdef libc.stdint.int64_t _decode_bigint(self) except? -1 - cdef float _decode_float(self) except? -1 - cdef double _decode_double(self) except? -1 - cdef bytes _decode_bytes(self) - -cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl): - cdef void _encode_end_message(self) - -cdef enum CoderType: - UNDEFINED = -1 - SIMPLE = 0 - COMPLEX = 1 - -cdef enum TypeName: - NONE = -1 - ROW = 0 - TINYINT = 1 - SMALLINT = 2 - INT = 3 - BIGINT = 4 - DECIMAL = 5 - FLOAT = 6 - DOUBLE = 7 - DATE = 8 - TIME = 9 - TIMESTAMP = 10 - BOOLEAN = 11 - BINARY = 12 - CHAR = 13 - ARRAY = 14 - MAP = 15 - LOCAL_ZONED_TIMESTAMP = 16 - -cdef class BaseCoder: - cpdef CoderType coder_type(self) - cpdef TypeName type_name(self) - -cdef class TinyIntCoderImpl(BaseCoder): - pass - -cdef class SmallIntCoderImpl(BaseCoder): - pass - -cdef class IntCoderImpl(BaseCoder): - pass - -cdef class BigIntCoderImpl(BaseCoder): - pass - -cdef class BooleanCoderImpl(BaseCoder): - pass - -cdef class FloatCoderImpl(BaseCoder): - pass - -cdef class DoubleCoderImpl(BaseCoder): - pass - -cdef class BinaryCoderImpl(BaseCoder): - pass - -cdef class CharCoderImpl(BaseCoder): - pass - -cdef class DateCoderImpl(BaseCoder): - pass - -cdef class TimeCoderImpl(BaseCoder): - pass diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx deleted file mode 100644 index bfc08202..0000000 --- a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx +++ /dev/null @@ -1,553 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -# cython: language_level = 3 -# cython: infer_types = True -# cython: profile=True -# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True - -cimport libc.stdlib -from libc.string cimport strlen - -import datetime - -cdef class InputStreamAndFunctionWrapper: - def __cinit__(self, func, input_stream_wrapper): - self.func = func - self.input_stream_wrapper = input_stream_wrapper - -cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl): - def __cinit__(self, value_coder): - self._value_coder = value_coder - - cpdef encode_to_stream(self, value, OutputStream out_stream, bint nested): - self._value_coder.encode_to_stream(value, out_stream, nested) - - cpdef decode_from_stream(self, InputStream in_stream, bint nested): - return self._value_coder.decode_from_stream(in_stream, nested) - - cpdef get_estimated_size_and_observables(self, value, bint nested=False): - return 0, [] - -cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl): - def __init__(self, flatten_row_coder): - super(TableFunctionRowCoderImpl, self).__init__(flatten_row_coder._output_field_coders) - - cpdef encode_to_stream(self, input_stream_and_function_wrapper, OutputStream out_stream, - bint nested): - self._prepare_encode(input_stream_and_function_wrapper, out_stream) - while self._input_buffer_size > self._input_pos: - self._decode_next_row() - result = self.func(self.row) - if result: - for value in result: - if self._output_field_count == 1: - value = (value,) - self._encode_one_row(value) - self._maybe_flush(out_stream) - self._encode_end_message() - - self._map_output_data_to_output_stream(out_stream) - - # write 0x00 as end message - cdef void _encode_end_message(self): - if self._output_buffer_size < self._output_pos + 2: - self._extend(2) - self._output_data[self._output_pos] = 0x01 - self._output_data[self._output_pos + 1] = 0x00 - self._output_pos += 2 - -cdef class FlattenRowCoderImpl(StreamCoderImpl): - def __init__(self, field_coders): - self._output_field_coders = field_coders - self._output_field_count = len(self._output_field_coders) - self._output_field_type = <TypeName*> libc.stdlib.malloc( - self._output_field_count * sizeof(TypeName)) - self._output_coder_type = <CoderType*> libc.stdlib.malloc( - self._output_field_count * sizeof(CoderType)) - self._output_leading_complete_bytes_num = self._output_field_count // 8 - self._output_remaining_bits_num = self._output_field_count % 8 - self._tmp_output_buffer_size = 1024 - self._tmp_output_pos = 0 - self._tmp_output_data = <char*> libc.stdlib.malloc(self._tmp_output_buffer_size) - self._null_byte_search_table = <unsigned char*> libc.stdlib.malloc( - 8 * sizeof(unsigned char)) - self._init_attribute() - - cpdef decode_from_stream(self, InputStream in_stream, bint nested): - cdef InputStreamWrapper input_stream_wrapper - input_stream_wrapper = self._wrap_input_stream(in_stream, in_stream.size()) - return input_stream_wrapper - - cpdef encode_to_stream(self, input_stream_and_function_wrapper, OutputStream out_stream, - bint nested): - cdef list result - self._prepare_encode(input_stream_and_function_wrapper, out_stream) - while self._input_buffer_size > self._input_pos: - self._decode_next_row() - result = self.func(self.row) - self._encode_one_row(result) - self._maybe_flush(out_stream) - self._map_output_data_to_output_stream(out_stream) - - cdef void _init_attribute(self): - self._null_byte_search_table[0] = 0x80 - self._null_byte_search_table[1] = 0x40 - self._null_byte_search_table[2] = 0x20 - self._null_byte_search_table[3] = 0x10 - self._null_byte_search_table[4] = 0x08 - self._null_byte_search_table[5] = 0x04 - self._null_byte_search_table[6] = 0x02 - self._null_byte_search_table[7] = 0x01 - for i in range(self._output_field_count): - self._output_field_type[i] = self._output_field_coders[i].type_name() - self._output_coder_type[i] = self._output_field_coders[i].coder_type() - - cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, size_t size): - # wrappers the input field coders and input_stream together - # so that it can be transposed to operations - cdef InputStreamWrapper input_stream_wrapper - input_stream_wrapper = InputStreamWrapper() - input_stream_wrapper.input_stream = input_stream - input_stream_wrapper.input_field_coders = self._output_field_coders - input_stream_wrapper.input_remaining_bits_num = self._output_remaining_bits_num - input_stream_wrapper.input_leading_complete_bytes_num = \ - self._output_leading_complete_bytes_num - input_stream_wrapper.input_field_count = self._output_field_count - input_stream_wrapper.input_field_type = self._output_field_type - input_stream_wrapper.input_coder_type = self._output_coder_type - input_stream_wrapper.input_stream.pos = size - input_stream_wrapper.input_buffer_size = size - return input_stream_wrapper - - cdef void _encode_one_row(self, value): - cdef libc.stdint.int32_t i - self._write_null_mask(value, self._output_leading_complete_bytes_num, - self._output_remaining_bits_num) - for i in range(self._output_field_count): - item = value[i] - if item is not None: - if self._output_coder_type[i] == SIMPLE: - self._encode_field_simple(self._output_field_type[i], item) - - self._copy_to_output_buffer() - - cdef void _read_null_mask(self, bint*null_mask, - libc.stdint.int32_t input_leading_complete_bytes_num, - libc.stdint.int32_t input_remaining_bits_num): - cdef libc.stdint.int32_t field_pos, i - cdef unsigned char b - field_pos = 0 - for _ in range(input_leading_complete_bytes_num): - b = self._input_data[self._input_pos] - self._input_pos += 1 - for i in range(8): - null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0 - field_pos += 1 - - if input_remaining_bits_num: - b = self._input_data[self._input_pos] - self._input_pos += 1 - for i in range(input_remaining_bits_num): - null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0 - field_pos += 1 - - cdef void _decode_next_row(self): - cdef libc.stdint.int32_t i - # skip prefix variable int length - while self._input_data[self._input_pos] & 0x80: - self._input_pos += 1 - self._input_pos += 1 - self._read_null_mask(self._null_mask, self._input_leading_complete_bytes_num, - self._input_remaining_bits_num) - for i in range(self._input_field_count): - if self._null_mask[i]: - self.row[i] = None - else: - if self._input_coder_type[i] == SIMPLE: - self.row[i] = self._decode_field_simple(self._input_field_type[i]) - - cdef object _decode_field_simple(self, TypeName field_type): - cdef libc.stdint.int32_t value, minutes, seconds, hours - cdef libc.stdint.int64_t milliseconds - if field_type == TINYINT: - # tinyint - return self._decode_byte() - elif field_type == SMALLINT: - # smallint - return self._decode_smallint() - elif field_type == INT: - # int - return self._decode_int() - elif field_type == BIGINT: - # bigint - return self._decode_bigint() - elif field_type == BOOLEAN: - # boolean - return not not self._decode_byte() - elif field_type == FLOAT: - # float - return self._decode_float() - elif field_type == DOUBLE: - # double - return self._decode_double() - elif field_type == BINARY: - # bytes - return self._decode_bytes() - elif field_type == CHAR: - # str - return self._decode_bytes().decode("utf-8") - elif field_type == DATE: - # Date - # EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal() - # The value of EPOCH_ORDINAL is 719163 - return datetime.date.fromordinal(self._decode_int() + 719163) - elif field_type == TIME: - # Time - value = self._decode_int() - seconds = value // 1000 - milliseconds = value % 1000 - minutes = seconds // 60 - seconds %= 60 - hours = minutes // 60 - minutes %= 60 - return datetime.time(hours, minutes, seconds, milliseconds * 1000) - - cdef unsigned char _decode_byte(self) except? -1: - self._input_pos += 1 - return <unsigned char> self._input_data[self._input_pos - 1] - - cdef libc.stdint.int16_t _decode_smallint(self) except? -1: - self._input_pos += 2 - return (<unsigned char> self._input_data[self._input_pos - 1] - | <libc.stdint.uint32_t> <unsigned char> self._input_data[self._input_pos - 2] << 8) - - cdef libc.stdint.int32_t _decode_int(self) except? -1: - self._input_pos += 4 - return (<unsigned char> self._input_data[self._input_pos - 1] - | <libc.stdint.uint32_t> <unsigned char> self._input_data[self._input_pos - 2] << 8 - | <libc.stdint.uint32_t> <unsigned char> self._input_data[self._input_pos - 3] << 16 - | <libc.stdint.uint32_t> <unsigned char> self._input_data[ - self._input_pos - 4] << 24) - - cdef libc.stdint.int64_t _decode_bigint(self) except? -1: - self._input_pos += 8 - return (<unsigned char> self._input_data[self._input_pos - 1] - | <libc.stdint.uint64_t> <unsigned char> self._input_data[self._input_pos - 2] << 8 - | <libc.stdint.uint64_t> <unsigned char> self._input_data[self._input_pos - 3] << 16 - | <libc.stdint.uint64_t> <unsigned char> self._input_data[self._input_pos - 4] << 24 - | <libc.stdint.uint64_t> <unsigned char> self._input_data[self._input_pos - 5] << 32 - | <libc.stdint.uint64_t> <unsigned char> self._input_data[self._input_pos - 6] << 40 - | <libc.stdint.uint64_t> <unsigned char> self._input_data[self._input_pos - 7] << 48 - | <libc.stdint.uint64_t> <unsigned char> self._input_data[ - self._input_pos - 8] << 56) - - cdef float _decode_float(self) except? -1: - cdef libc.stdint.int32_t as_long = self._decode_int() - return (<float*> <char*> &as_long)[0] - - cdef double _decode_double(self) except? -1: - cdef libc.stdint.int64_t as_long = self._decode_bigint() - return (<double*> <char*> &as_long)[0] - - cdef bytes _decode_bytes(self): - cdef libc.stdint.int32_t size = self._decode_int() - self._input_pos += size - return self._input_data[self._input_pos - size: self._input_pos] - - cdef void _prepare_encode(self, InputStreamAndFunctionWrapper input_stream_and_function_wrapper, - OutputStream out_stream): - cdef InputStreamWrapper input_stream_wrapper - # get the data pointer of output_stream - self._output_data = out_stream.data - self._output_pos = out_stream.pos - self._output_buffer_size = out_stream.buffer_size - self._tmp_output_pos = 0 - - input_stream_wrapper = input_stream_and_function_wrapper.input_stream_wrapper - # get the data pointer of input_stream - self._input_data = input_stream_wrapper.input_stream.allc - self._input_buffer_size = input_stream_wrapper.input_buffer_size - - # get the infos of input coder which will be used to decode data from input_stream - self._input_field_count = input_stream_wrapper.input_field_count - self._input_leading_complete_bytes_num = input_stream_wrapper.input_leading_complete_bytes_num - self._input_remaining_bits_num = input_stream_wrapper.input_remaining_bits_num - self._input_field_type = input_stream_wrapper.input_field_type - self._input_coder_type = input_stream_wrapper.input_coder_type - self._input_field_coders = input_stream_wrapper.input_field_coders - self._null_mask = <bint*> libc.stdlib.malloc(self._input_field_count * sizeof(bint)) - self._input_pos = 0 - - # initial the result row and get the Python user-defined function - self.row = [None for _ in range(self._input_field_count)] - self.func = input_stream_and_function_wrapper.func - - cdef void _encode_field_simple(self, TypeName field_type, item): - cdef libc.stdint.int32_t hour, minute, seconds, microsecond, milliseconds - if field_type == TINYINT: - # tinyint - self._encode_byte(item) - elif field_type == SMALLINT: - # smallint - self._encode_smallint(item) - elif field_type == INT: - # int - self._encode_int(item) - elif field_type == BIGINT: - # bigint - self._encode_bigint(item) - elif field_type == BOOLEAN: - # boolean - self._encode_byte(item) - elif field_type == FLOAT: - # float - self._encode_float(item) - elif field_type == DOUBLE: - # double - self._encode_double(item) - elif field_type == BINARY: - # bytes - self._encode_bytes(item) - elif field_type == CHAR: - # str - self._encode_bytes(item.encode('utf-8')) - elif field_type == DATE: - # Date - # EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal() - # The value of EPOCH_ORDINAL is 719163 - self._encode_int(item.toordinal() - 719163) - elif field_type == TIME: - # Time - hour = item.hour - minute = item.minute - seconds = item.second - microsecond = item.microsecond - milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 + microsecond // 1000 - self._encode_int(milliseconds) - - cdef void _copy_to_output_buffer(self): - cdef size_t size - cdef size_t i - cdef bint is_realloc - cdef char bits - # the length of the variable prefix length will be less than 9 bytes - if self._output_buffer_size < self._output_pos + self._tmp_output_pos + 9: - self._output_buffer_size += self._tmp_output_buffer_size + 9 - self._output_data = <char*> libc.stdlib.realloc(self._output_data, - self._output_buffer_size) - size = self._tmp_output_pos - # write variable prefix length - while size: - bits = size & 0x7F - size >>= 7 - if size: - bits |= 0x80 - self._output_data[self._output_pos] = bits - self._output_pos += 1 - if self._tmp_output_pos < 8: - # This is faster than memcpy when the string is short. - for i in range(self._tmp_output_pos): - self._output_data[self._output_pos + i] = self._tmp_output_data[i] - else: - libc.string.memcpy(self._output_data + self._output_pos, self._tmp_output_data, - self._tmp_output_pos) - self._output_pos += self._tmp_output_pos - self._tmp_output_pos = 0 - - cdef void _maybe_flush(self, OutputStream out_stream): - # Currently, it will trigger flushing when the size of buffer reach to 10_000_000 - if self._output_pos > 10_000_000: - self._map_output_data_to_output_stream(out_stream) - out_stream.flush() - self._output_pos = 0 - - cdef void _map_output_data_to_output_stream(self, OutputStream out_stream): - out_stream.data = self._output_data - out_stream.pos = self._output_pos - out_stream.buffer_size = self._output_buffer_size - - cdef void _extend(self, size_t missing): - while self._tmp_output_buffer_size < self._tmp_output_pos + missing: - self._tmp_output_buffer_size *= 2 - self._tmp_output_data = <char*> libc.stdlib.realloc(self._tmp_output_data, - self._tmp_output_buffer_size) - - cdef void _encode_byte(self, unsigned char val): - if self._tmp_output_buffer_size < self._tmp_output_pos + 1: - self._extend(1) - self._tmp_output_data[self._tmp_output_pos] = val - self._tmp_output_pos += 1 - - cdef void _encode_smallint(self, libc.stdint.int16_t v): - if self._tmp_output_buffer_size < self._tmp_output_pos + 2: - self._extend(2) - self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 8) - self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> v - self._tmp_output_pos += 2 - - cdef void _encode_int(self, libc.stdint.int32_t v): - if self._tmp_output_buffer_size < self._tmp_output_pos + 4: - self._extend(4) - self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 24) - self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> (v >> 16) - self._tmp_output_data[self._tmp_output_pos + 2] = <unsigned char> (v >> 8) - self._tmp_output_data[self._tmp_output_pos + 3] = <unsigned char> v - self._tmp_output_pos += 4 - - cdef void _encode_bigint(self, libc.stdint.int64_t v): - if self._tmp_output_buffer_size < self._tmp_output_pos + 8: - self._extend(8) - self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 56) - self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> (v >> 48) - self._tmp_output_data[self._tmp_output_pos + 2] = <unsigned char> (v >> 40) - self._tmp_output_data[self._tmp_output_pos + 3] = <unsigned char> (v >> 32) - self._tmp_output_data[self._tmp_output_pos + 4] = <unsigned char> (v >> 24) - self._tmp_output_data[self._tmp_output_pos + 5] = <unsigned char> (v >> 16) - self._tmp_output_data[self._tmp_output_pos + 6] = <unsigned char> (v >> 8) - self._tmp_output_data[self._tmp_output_pos + 7] = <unsigned char> v - self._tmp_output_pos += 8 - - cdef void _encode_float(self, float v): - self._encode_int((<libc.stdint.int32_t*> <char*> &v)[0]) - - cdef void _encode_double(self, double v): - self._encode_bigint((<libc.stdint.int64_t*> <char*> &v)[0]) - - cdef void _encode_bytes(self, char*b): - cdef libc.stdint.int32_t length = strlen(b) - self._encode_int(length) - if self._tmp_output_buffer_size < self._tmp_output_pos + length: - self._extend(length) - if length < 8: - # This is faster than memcpy when the string is short. - for i in range(length): - self._tmp_output_data[self._tmp_output_pos + i] = b[i] - else: - libc.string.memcpy(self._tmp_output_data + self._tmp_output_pos, b, length) - self._tmp_output_pos += length - - cdef void _write_null_mask(self, value, libc.stdint.int32_t leading_complete_bytes_num, - libc.stdint.int32_t remaining_bits_num): - cdef libc.stdint.int32_t field_pos, index - cdef unsigned char*null_byte_search_table - cdef unsigned char b, i - field_pos = 0 - null_byte_search_table = self._null_byte_search_table - for _ in range(leading_complete_bytes_num): - b = 0x00 - for i in range(8): - if value[field_pos + i] is None: - b |= null_byte_search_table[i] - field_pos += 8 - self._encode_byte(b) - - if remaining_bits_num: - b = 0x00 - for i in range(remaining_bits_num): - if value[field_pos + i] is None: - b |= null_byte_search_table[i] - self._encode_byte(b) - - def __dealloc__(self): - if self.null_mask: - libc.stdlib.free(self._null_mask) - if self.null_byte_search_table: - libc.stdlib.free(self._null_byte_search_table) - if self._tmp_output_data: - libc.stdlib.free(self._tmp_output_data) - if self._output_field_type: - libc.stdlib.free(self._output_field_type) - if self._output_coder_type: - libc.stdlib.free(self._output_coder_type) - -cdef class BaseCoder: - cpdef CoderType coder_type(self): - return UNDEFINED - - cpdef TypeName type_name(self): - return NONE - -cdef class TinyIntCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - - cpdef TypeName type_name(self): - return TINYINT - -cdef class SmallIntCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - cpdef TypeName type_name(self): - return SMALLINT - -cdef class IntCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - - cpdef TypeName type_name(self): - return INT - -cdef class BigIntCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - cpdef TypeName type_name(self): - return BIGINT - -cdef class BooleanCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - cpdef TypeName type_name(self): - return BOOLEAN - -cdef class FloatCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - cpdef TypeName type_name(self): - return FLOAT - -cdef class DoubleCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - cpdef TypeName type_name(self): - return DOUBLE - -cdef class BinaryCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - cpdef TypeName type_name(self): - return BINARY - -cdef class CharCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - cpdef TypeName type_name(self): - return CHAR - -cdef class DateCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - - cpdef TypeName type_name(self): - return DATE - -cdef class TimeCoderImpl(BaseCoder): - cpdef CoderType coder_type(self): - return SIMPLE - - cpdef TypeName type_name(self): - return TIME diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py b/flink-python/pyflink/fn_execution/tests/test_coders_common.py similarity index 95% rename from flink-python/pyflink/fn_execution/tests/test_coders.py rename to flink-python/pyflink/fn_execution/tests/test_coders_common.py index 3134e83..69e9961 100644 --- a/flink-python/pyflink/fn_execution/tests/test_coders.py +++ b/flink-python/pyflink/fn_execution/tests/test_coders_common.py @@ -25,16 +25,7 @@ from pyflink.fn_execution.coders import BigIntCoder, TinyIntCoder, BooleanCoder, TimeCoder, TimestampCoder, ArrayCoder, MapCoder, DecimalCoder, FlattenRowCoder, RowCoder, \ LocalZonedTimestampCoder -try: - from pyflink.fn_execution import fast_coder_impl # noqa # pylint: disable=unused-import - have_cython = True -except ImportError: - have_cython = False - - -@unittest.skipIf(have_cython, - "Found cython implementation, we don't need to test non-compiled implementation") class CodersTest(unittest.TestCase): def check_coder(self, coder, *values): diff --git a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py deleted file mode 100644 index 15a8ab0..0000000 --- a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py +++ /dev/null @@ -1,141 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -"""Tests common to all coder implementations.""" -import logging -import unittest - -from pyflink.fn_execution import coder_impl - -try: - from pyflink.fn_execution import fast_coder_impl - - have_cython = True -except ImportError: - have_cython = False - - -@unittest.skipUnless(have_cython, "Uncompiled Cython Coder") -class CodersTest(unittest.TestCase): - - def check_cython_coder(self, python_field_coders, cython_field_coders, data): - from apache_beam.coders.coder_impl import create_InputStream, create_OutputStream - from pyflink.fn_execution.fast_coder_impl import InputStreamAndFunctionWrapper - py_flatten_row_coder = coder_impl.FlattenRowCoderImpl(python_field_coders) - internal = py_flatten_row_coder.encode(data) - input_stream = create_InputStream(internal) - output_stream = create_OutputStream() - cy_flatten_row_coder = fast_coder_impl.FlattenRowCoderImpl(cython_field_coders) - value = cy_flatten_row_coder.decode_from_stream(input_stream, False) - wrapper_func_input_element = InputStreamAndFunctionWrapper( - lambda v: [v[i] for i in range(len(v))], value) - cy_flatten_row_coder.encode_to_stream(wrapper_func_input_element, output_stream, False) - generator_result = py_flatten_row_coder.decode_from_stream(create_InputStream( - output_stream.get()), False) - result = [] - for item in generator_result: - result.append(item) - try: - self.assertEqual(result, data) - except AssertionError: - self.assertEqual(len(result), len(data)) - self.assertEqual(len(result[0]), len(data[0])) - for i in range(len(data[0])): - if isinstance(data[0][i], float): - from pyflink.table.tests.test_udf import float_equal - assert float_equal(data[0][i], result[0][i], 1e-6) - else: - self.assertEqual(data[0][i], result[0][i]) - - # decide whether two floats are equal - @staticmethod - def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0): - return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol) - - def test_cython_bigint_coder(self): - data = [1, 100, -100, -1000] - python_field_coders = [coder_impl.BigIntCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.BigIntCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_tinyint_coder(self): - data = [1, 10, 127, -128] - python_field_coders = [coder_impl.TinyIntCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.TinyIntCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_boolean_coder(self): - data = [True, False] - python_field_coders = [coder_impl.BooleanCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.BooleanCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_smallint_coder(self): - data = [32767, -32768, 0] - python_field_coders = [coder_impl.SmallIntCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.SmallIntCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_int_coder(self): - data = [-2147483648, 2147483647] - python_field_coders = [coder_impl.IntCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.IntCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_float_coder(self): - data = [1.02, 1.32] - python_field_coders = [coder_impl.FloatCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.FloatCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_double_coder(self): - data = [-12.02, 1.98932] - python_field_coders = [coder_impl.DoubleCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.DoubleCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_binary_coder(self): - data = [b'pyflink'] - python_field_coders = [coder_impl.BinaryCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.BinaryCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_char_coder(self): - data = ['flink', '🐿'] - python_field_coders = [coder_impl.CharCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.CharCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_date_coder(self): - import datetime - data = [datetime.date(2019, 9, 10)] - python_field_coders = [coder_impl.DateCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.DateCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - def test_cython_time_coder(self): - import datetime - data = [datetime.time(hour=11, minute=11, second=11, microsecond=123000)] - python_field_coders = [coder_impl.TimeCoderImpl() for _ in range(len(data))] - cython_field_coders = [fast_coder_impl.TimeCoderImpl() for _ in range(len(data))] - self.check_cython_coder(python_field_coders, cython_field_coders, [data]) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() diff --git a/flink-python/setup.py b/flink-python/setup.py index 4fa842e..c74abeb 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -20,10 +20,9 @@ from __future__ import print_function import io import os import sys -from distutils.command.build_ext import build_ext from shutil import copytree, copy, rmtree -from setuptools import setup, Extension +from setuptools import setup if sys.version_info < (3, 5): print("Python versions prior to 3.5 are not supported for PyFlink.", @@ -40,26 +39,6 @@ def remove_if_exists(file_path): rmtree(file_path) -try: - from Cython.Build import cythonize - extensions = cythonize([ - Extension( - name="pyflink.fn_execution.fast_coder_impl", - sources=["pyflink/fn_execution/fast_coder_impl.pyx"], - include_dirs=["pyflink/fn_execution/"]) - ]) -except ImportError: - if os.path.exists("pyflink/fn_execution/fast_coder_impl.c"): - extensions = ([ - Extension( - name="pyflink.fn_execution.fast_coder_impl", - sources=["pyflink/fn_execution/fast_coder_impl.c"], - include_dirs=["pyflink/fn_execution/"]) - ]) - else: - extensions = ([]) - - this_directory = os.path.abspath(os.path.dirname(__file__)) version_file = os.path.join(this_directory, 'pyflink/version.py') @@ -253,19 +232,16 @@ run sdist. install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0', 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2', 'pandas>=0.23.4,<=0.25.3', 'pyarrow>=0.15.1,<0.16.0', 'pytz>=2018.3'], - cmdclass={'build_ext': build_ext}, tests_require=['pytest==4.4.1'], description='Apache Flink Python API', long_description=long_description, long_description_content_type='text/markdown', - zip_safe=False, classifiers=[ 'Development Status :: 5 - Production/Stable', 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7'], - ext_modules=extensions + 'Programming Language :: Python :: 3.7'] ) finally: if in_flink_source: diff --git a/flink-python/tox.ini b/flink-python/tox.ini index 2742ed2..b944857 100644 --- a/flink-python/tox.ini +++ b/flink-python/tox.ini @@ -21,23 +21,17 @@ # in multiple virtualenvs. This configuration file will run the # test suite on all supported python versions. # new environments will be excluded by default unless explicitly added to envlist. -envlist = {py35, py36, py37}-cython +envlist = py35, py36, py37 [testenv] whitelist_externals= /bin/bash deps = pytest - apache-beam==2.19.0 - cython==0.28.1 grpcio>=1.17.0,<=1.26.0 grpcio-tools>=1.3.5,<=1.14.2 commands = python --version - # python test - pytest --durations=0 - python setup.py build_ext --inplace - # cython test pytest --durations=0 bash ./dev/run_pip_test.sh # Replace the default installation command with a custom retry installation script, because on high-speed