This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new cf3beff586a [FLINK-28876][python][format/orc] Support Orc format cf3beff586a is described below commit cf3beff586a088c20833d8a8fc72bbc37d87a2b1 Author: Juntao Hu <maybach...@gmail.com> AuthorDate: Mon Aug 8 23:23:44 2022 +0800 [FLINK-28876][python][format/orc] Support Orc format This closes #20505. --- .../docs/connectors/datastream/filesystem.md | 30 ++++ .../docs/connectors/datastream/filesystem.md | 30 ++++ flink-python/pom.xml | 8 ++ flink-python/pyflink/datastream/__init__.py | 3 + flink-python/pyflink/datastream/formats/orc.py | 100 +++++++++++++ flink-python/pyflink/datastream/formats/parquet.py | 4 +- .../pyflink/datastream/formats/tests/test_orc.py | 156 +++++++++++++++++++++ flink-python/pyflink/datastream/utils.py | 8 ++ 8 files changed, 337 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md index f5468862a53..d48e84f52d8 100644 --- a/docs/content.zh/docs/connectors/datastream/filesystem.md +++ b/docs/content.zh/docs/connectors/datastream/filesystem.md @@ -811,6 +811,36 @@ class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { {{< /tab >}} {{< /tabs >}} +PyFlink 用户可以使用 `OrcBulkWriters.for_row_type` 来创建将 `Row` 数据写入 Orc 文件的 `BulkWriterFactory` 。 +注意如果 sink 的前置算子的输出类型为 `RowData` ,例如 CSV source ,则需要先转换为 `Row` 类型。 + +{{< py_download_link "orc" >}} + +```python +row_type = DataTypes.ROW([ + DataTypes.FIELD('name', DataTypes.STRING()), + DataTypes.FIELD('age', DataTypes.INT()), +]) +row_type_info = Types.ROW_NAMED( + ['name', 'age'], + [Types.STRING(), Types.INT()] +) + +sink = FileSink.for_bulk_format( + OUTPUT_DIR, + OrcBulkWriters.for_row_type( + row_type=row_type, + writer_properties=Configuration(), + hadoop_config=Configuration(), + ) +).build() + +# 如果 ds 是产生 RowData 的数据源,可以使用一个 map 函数来指定其对应的 Row 类型。 +ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) +# 否则 +ds.sink_to(sink) +``` + <a name="hadoop-sequencefile-format"></a> ##### Hadoop SequenceFile Format diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md index f5a4517aa95..41eb054015e 100644 --- a/docs/content/docs/connectors/datastream/filesystem.md +++ b/docs/content/docs/connectors/datastream/filesystem.md @@ -816,6 +816,36 @@ class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { {{< /tab >}} {{< /tabs >}} +For PyFlink users, `OrcBulkWriters.for_row_type` could be used to create `BulkWriterFactory` to write `Row` records to files in Orc format. +It should be noted that if the preceding operator of sink is an operator producing `RowData` records, e.g. CSV source, it needs to be converted to `Row` records before writing to sink. + +{{< py_download_link "orc" >}} + +```python +row_type = DataTypes.ROW([ + DataTypes.FIELD('name', DataTypes.STRING()), + DataTypes.FIELD('age', DataTypes.INT()), +]) +row_type_info = Types.ROW_NAMED( + ['name', 'age'], + [Types.STRING(), Types.INT()] +) + +sink = FileSink.for_bulk_format( + OUTPUT_DIR, + OrcBulkWriters.for_row_type( + row_type=row_type, + writer_properties=Configuration(), + hadoop_config=Configuration(), + ) +).build() + +# If ds is a source stream producing RowData records, a map could be added to help converting RowData records into Row records. +ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) +# Else +ds.sink_to(sink) +``` + ##### Hadoop SequenceFile format To use the `SequenceFile` bulk encoder in your application you need to add the following dependency: diff --git a/flink-python/pom.xml b/flink-python/pom.xml index f1a61a9eff6..5922e98a7e4 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -247,6 +247,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <!-- Indirectly accessed in pyflink_gateway_server --> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-orc</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <!-- Indirectly accessed in pyflink_gateway_server --> <groupId>org.apache.flink</groupId> diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py index 6684a7120c0..436e8677666 100644 --- a/flink-python/pyflink/datastream/__init__.py +++ b/flink-python/pyflink/datastream/__init__.py @@ -236,6 +236,9 @@ Classes to define formats used together with source & sink: - :class:`formats.parquet.AvroParquetWriters`: Convenience builder to create ParquetWriterFactory instances for Avro types. Only GenericRecord is supported in PyFlink. + - :class:`formats.orc.OrcBulkWriters`: + Convenient builder to create a :class:`BulkWriterFactory` that writes Row records with a + defined :class:`RowType` into Orc files. Other important classes: diff --git a/flink-python/pyflink/datastream/formats/orc.py b/flink-python/pyflink/datastream/formats/orc.py new file mode 100644 index 00000000000..a34093d8fa7 --- /dev/null +++ b/flink-python/pyflink/datastream/formats/orc.py @@ -0,0 +1,100 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import Optional + +from pyflink.common import Configuration +from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory +from pyflink.datastream.utils import create_hadoop_configuration, create_java_properties +from pyflink.java_gateway import get_gateway +from pyflink.table.types import _to_java_data_type, RowType +from pyflink.util.java_utils import to_jarray + +__all__ = [ + 'OrcBulkWriters' +] + + +class OrcBulkWriters(object): + """ + Convenient builder to create a :class:`~connectors.file_system.BulkWriterFactory` that writes + Row records with a defined RowType into Orc files in a batch fashion. + + .. versionadded:: 1.16.0 + """ + + @staticmethod + def for_row_type(row_type: RowType, + writer_properties: Optional[Configuration] = None, + hadoop_config: Optional[Configuration] = None) \ + -> BulkWriterFactory: + """ + Create a RowDataBulkWriterFactory that writes Row records with a defined RowType into Orc + files in a batch fashion. + + Example: + :: + + >>> row_type = DataTypes.ROW([ + ... DataTypes.FIELD('string', DataTypes.STRING()), + ... DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())) + ... ]) + >>> row_type_info = Types.ROW_NAMED( + ... ['string', 'int_array'], + ... [Types.STRING(), Types.LIST(Types.INT())] + ... ) + >>> sink = FileSink.for_bulk_format( + ... OUTPUT_DIR, OrcBulkWriters.for_row_type( + ... row_type=row_type, + ... writer_properties=Configuration(), + ... hadoop_config=Configuration(), + ... ) + ... ).build() + >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) + + Note that in the above example, an identity map to indicate its RowTypeInfo is necessary + before ``sink_to`` when ``ds`` is a source stream producing **RowData** records, + because RowDataBulkWriterFactory assumes the input record type is Row. + """ + if not isinstance(row_type, RowType): + raise TypeError('row_type must be an instance of RowType') + + j_data_type = _to_java_data_type(row_type) + jvm = get_gateway().jvm + j_row_type = j_data_type.getLogicalType() + orc_types = to_jarray( + jvm.org.apache.flink.table.types.logical.LogicalType, + [i for i in j_row_type.getChildren()] + ) + type_description = jvm.org.apache.flink.orc \ + .OrcSplitReaderUtil.logicalTypeToOrcType(j_row_type) + if writer_properties is None: + writer_properties = Configuration() + if hadoop_config is None: + hadoop_config = Configuration() + + return RowDataBulkWriterFactory( + jvm.org.apache.flink.orc.writer.OrcBulkWriterFactory( + jvm.org.apache.flink.orc.vector.RowDataVectorizer( + type_description.toString(), + orc_types + ), + create_java_properties(writer_properties), + create_hadoop_configuration(hadoop_config) + ), + row_type + ) diff --git a/flink-python/pyflink/datastream/formats/parquet.py b/flink-python/pyflink/datastream/formats/parquet.py index 687bc35aa4d..5d1164b82b8 100644 --- a/flink-python/pyflink/datastream/formats/parquet.py +++ b/flink-python/pyflink/datastream/formats/parquet.py @@ -74,8 +74,8 @@ class AvroParquetReaders(object): class AvroParquetWriters(object): """ - Convenient builder to create ParquetWriterFactory instances for Avro types. Only GenericRecord - is supported in PyFlink. + Convenient builder to create Parquet BulkWriterFactory instances for Avro types. + Only GenericRecord is supported at present. .. versionadded:: 1.16.0 """ diff --git a/flink-python/pyflink/datastream/formats/tests/test_orc.py b/flink-python/pyflink/datastream/formats/tests/test_orc.py new file mode 100644 index 00000000000..77ce3b085de --- /dev/null +++ b/flink-python/pyflink/datastream/formats/tests/test_orc.py @@ -0,0 +1,156 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import glob +import os +import tempfile +import unittest +from datetime import date, datetime +from decimal import Decimal +from typing import List, Optional, Tuple + +import pandas as pd + +from pyflink.common import Row +from pyflink.common.typeinfo import RowTypeInfo, Types +from pyflink.datastream import DataStream +from pyflink.datastream.connectors.file_system import FileSink +from pyflink.datastream.formats.orc import OrcBulkWriters +from pyflink.datastream.formats.tests.test_parquet import _create_parquet_array_row_and_data, \ + _check_parquet_array_results, _create_parquet_map_row_and_data, _check_parquet_map_results +from pyflink.java_gateway import get_gateway +from pyflink.table.types import RowType, DataTypes +from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, to_java_data_structure + + +@unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None, + 'Some Hadoop lib is needed for Orc format tests') +class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase): + + def setUp(self): + super().setUp() + self.env.set_parallelism(1) + self.orc_dir_name = tempfile.mkdtemp(dir=self.tempdir) + + def test_orc_basic_write(self): + row_type, row_type_info, data = _create_orc_basic_row_and_data() + self._build_orc_job(row_type, row_type_info, data) + self.env.execute('test_orc_basic_write') + results = self._read_orc_file() + _check_orc_basic_results(self, results) + + def test_orc_array_write(self): + ( + row_type, + row_type_info, + conversion_row_type_info, + data, + ) = _create_parquet_array_row_and_data() + self._build_orc_job(row_type, row_type_info, data, conversion_row_type_info) + self.env.execute() + results = self._read_orc_file() + _check_parquet_array_results(self, results) + + def test_orc_map_write(self): + row_type, row_type_info, data = _create_parquet_map_row_and_data() + self._build_orc_job(row_type, row_type_info, data) + self.env.execute() + results = self._read_orc_file() + _check_parquet_map_results(self, results) + + def _build_orc_job( + self, + row_type: RowType, + row_type_info: RowTypeInfo, + data: List[Row], + conversion_type_info: Optional[RowTypeInfo] = None, + ): + jvm = get_gateway().jvm + sink = FileSink.for_bulk_format( + self.orc_dir_name, OrcBulkWriters.for_row_type(row_type) + ).build() + j_list = jvm.java.util.ArrayList() + for d in data: + j_list.add(to_java_data_structure(d)) + ds = DataStream(self.env._j_stream_execution_environment.fromCollection( + j_list, + row_type_info.get_java_type_info() + )) + if conversion_type_info: + ds = ds.map(lambda e: e, output_type=conversion_type_info) + ds.sink_to(sink) + + def _read_orc_file(self): + records = [] + for file in glob.glob(os.path.join(os.path.join(self.orc_dir_name, '**/*'))): + df = pd.read_orc(file) + for i in range(df.shape[0]): + records.append(df.loc[i]) + return records + + +def _create_orc_basic_row_and_data() -> Tuple[RowType, RowTypeInfo, List[Row]]: + jvm = get_gateway().jvm + row_type = DataTypes.ROW([ + DataTypes.FIELD('char', DataTypes.CHAR(10)), + DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)), + DataTypes.FIELD('bytes', DataTypes.BYTES()), + DataTypes.FIELD('boolean', DataTypes.BOOLEAN()), + DataTypes.FIELD('decimal', DataTypes.DECIMAL(2, 0)), + DataTypes.FIELD('int', DataTypes.INT()), + DataTypes.FIELD('bigint', DataTypes.BIGINT()), + DataTypes.FIELD('double', DataTypes.DOUBLE()), + DataTypes.FIELD('date', DataTypes.DATE()), + DataTypes.FIELD('timestamp', DataTypes.TIMESTAMP(3)), + ]) + row_type_info = Types.ROW_NAMED( + ['char', 'varchar', 'bytes', 'boolean', 'decimal', 'int', 'bigint', 'double', + 'date', 'timestamp'], + [Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()), Types.BOOLEAN(), + Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(), + Types.JAVA(jvm.java.time.LocalTime), Types.JAVA(jvm.java.time.LocalDateTime)] + ) + data = [Row( + char='char', + varchar='varchar', + bytes=b'varbinary', + boolean=True, + decimal=Decimal(1.5), + int=2147483647, + bigint=-9223372036854775808, + double=2e-308, + date=date(1970, 1, 1), + timestamp=datetime(1970, 1, 2, 3, 4, 5, 600000), + )] + return row_type, row_type_info, data + + +def _check_orc_basic_results(test, results): + row = results[0] + test.assertEqual(row['char'], b'char ') + test.assertEqual(row['varchar'], 'varchar') + test.assertEqual(row['bytes'], b'varbinary') + test.assertEqual(row['boolean'], True) + test.assertAlmostEqual(row['decimal'], 2) + test.assertEqual(row['int'], 2147483647) + test.assertEqual(row['bigint'], -9223372036854775808) + test.assertAlmostEqual(row['double'], 2e-308, delta=1e-311) + test.assertEqual(row['date'], date(1970, 1, 1)) + test.assertEqual( + row['timestamp'].to_pydatetime(), + datetime(1970, 1, 2, 3, 4, 5, 600000), + ) diff --git a/flink-python/pyflink/datastream/utils.py b/flink-python/pyflink/datastream/utils.py index f1bdb186177..efbdd1b5da1 100644 --- a/flink-python/pyflink/datastream/utils.py +++ b/flink-python/pyflink/datastream/utils.py @@ -51,6 +51,14 @@ def create_hadoop_configuration(config: Configuration): return hadoop_config +def create_java_properties(config: Configuration): + jvm = get_gateway().jvm + properties = jvm.java.util.Properties() + for k, v in config.to_dict().items(): + properties.put(k, v) + return properties + + def convert_to_python_obj(data, type_info): if type_info == Types.PICKLED_BYTE_ARRAY(): return pickle.loads(data)