This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cf3beff586a [FLINK-28876][python][format/orc] Support Orc format
cf3beff586a is described below

commit cf3beff586a088c20833d8a8fc72bbc37d87a2b1
Author: Juntao Hu <maybach...@gmail.com>
AuthorDate: Mon Aug 8 23:23:44 2022 +0800

    [FLINK-28876][python][format/orc] Support Orc format
    
    This closes #20505.
---
 .../docs/connectors/datastream/filesystem.md       |  30 ++++
 .../docs/connectors/datastream/filesystem.md       |  30 ++++
 flink-python/pom.xml                               |   8 ++
 flink-python/pyflink/datastream/__init__.py        |   3 +
 flink-python/pyflink/datastream/formats/orc.py     | 100 +++++++++++++
 flink-python/pyflink/datastream/formats/parquet.py |   4 +-
 .../pyflink/datastream/formats/tests/test_orc.py   | 156 +++++++++++++++++++++
 flink-python/pyflink/datastream/utils.py           |   8 ++
 8 files changed, 337 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md 
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index f5468862a53..d48e84f52d8 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -811,6 +811,36 @@ class PersonVectorizer(schema: String) extends 
Vectorizer[Person](schema) {
 {{< /tab >}}
 {{< /tabs >}}
 
+PyFlink 用户可以使用 `OrcBulkWriters.for_row_type` 来创建将 `Row` 数据写入 Orc 文件的 
`BulkWriterFactory` 。
+注意如果 sink 的前置算子的输出类型为 `RowData` ,例如 CSV source ,则需要先转换为 `Row` 类型。
+
+{{< py_download_link "orc" >}}
+
+```python
+row_type = DataTypes.ROW([
+    DataTypes.FIELD('name', DataTypes.STRING()),
+    DataTypes.FIELD('age', DataTypes.INT()),
+])
+row_type_info = Types.ROW_NAMED(
+    ['name', 'age'],
+    [Types.STRING(), Types.INT()]
+)
+
+sink = FileSink.for_bulk_format(
+    OUTPUT_DIR,
+    OrcBulkWriters.for_row_type(
+        row_type=row_type,
+        writer_properties=Configuration(),
+        hadoop_config=Configuration(),
+    )
+).build()
+
+# 如果 ds 是产生 RowData 的数据源,可以使用一个 map 函数来指定其对应的 Row 类型。
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# 否则
+ds.sink_to(sink)
+```
+
 <a name="hadoop-sequencefile-format"></a>
 
 ##### Hadoop SequenceFile Format
diff --git a/docs/content/docs/connectors/datastream/filesystem.md 
b/docs/content/docs/connectors/datastream/filesystem.md
index f5a4517aa95..41eb054015e 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -816,6 +816,36 @@ class PersonVectorizer(schema: String) extends 
Vectorizer[Person](schema) {
 {{< /tab >}}
 {{< /tabs >}}
 
+For PyFlink users, `OrcBulkWriters.for_row_type` could be used to create 
`BulkWriterFactory` to write `Row` records to files in Orc format.
+It should be noted that if the preceding operator of sink is an operator 
producing `RowData` records, e.g. CSV source, it needs to be converted to `Row` 
records before writing to sink.
+
+{{< py_download_link "orc" >}}
+
+```python
+row_type = DataTypes.ROW([
+    DataTypes.FIELD('name', DataTypes.STRING()),
+    DataTypes.FIELD('age', DataTypes.INT()),
+])
+row_type_info = Types.ROW_NAMED(
+    ['name', 'age'],
+    [Types.STRING(), Types.INT()]
+)
+
+sink = FileSink.for_bulk_format(
+    OUTPUT_DIR,
+    OrcBulkWriters.for_row_type(
+        row_type=row_type,
+        writer_properties=Configuration(),
+        hadoop_config=Configuration(),
+    )
+).build()
+
+# If ds is a source stream producing RowData records, a map could be added to 
help converting RowData records into Row records.
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# Else
+ds.sink_to(sink)
+```
+
 ##### Hadoop SequenceFile format
 
 To use the `SequenceFile` bulk encoder in your application you need to add the 
following dependency:
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index f1a61a9eff6..5922e98a7e4 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -247,6 +247,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <!-- Indirectly accessed in pyflink_gateway_server -->
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-sql-orc</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <!-- Indirectly accessed in pyflink_gateway_server -->
                        <groupId>org.apache.flink</groupId>
diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index 6684a7120c0..436e8677666 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -236,6 +236,9 @@ Classes to define formats used together with source & sink:
     - :class:`formats.parquet.AvroParquetWriters`:
       Convenience builder to create ParquetWriterFactory instances for Avro 
types. Only
       GenericRecord is supported in PyFlink.
+    - :class:`formats.orc.OrcBulkWriters`:
+      Convenient builder to create a :class:`BulkWriterFactory` that writes 
Row records with a
+      defined :class:`RowType` into Orc files.
 
 Other important classes:
 
diff --git a/flink-python/pyflink/datastream/formats/orc.py 
b/flink-python/pyflink/datastream/formats/orc.py
new file mode 100644
index 00000000000..a34093d8fa7
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/orc.py
@@ -0,0 +1,100 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Optional
+
+from pyflink.common import Configuration
+from pyflink.datastream.connectors.file_system import BulkWriterFactory, 
RowDataBulkWriterFactory
+from pyflink.datastream.utils import create_hadoop_configuration, 
create_java_properties
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import _to_java_data_type, RowType
+from pyflink.util.java_utils import to_jarray
+
+__all__ = [
+    'OrcBulkWriters'
+]
+
+
+class OrcBulkWriters(object):
+    """
+    Convenient builder to create a 
:class:`~connectors.file_system.BulkWriterFactory` that writes
+    Row records with a defined RowType into Orc files in a batch fashion.
+
+    .. versionadded:: 1.16.0
+    """
+
+    @staticmethod
+    def for_row_type(row_type: RowType,
+                     writer_properties: Optional[Configuration] = None,
+                     hadoop_config: Optional[Configuration] = None) \
+            -> BulkWriterFactory:
+        """
+        Create a RowDataBulkWriterFactory that writes Row records with a 
defined RowType into Orc
+        files in a batch fashion.
+
+        Example:
+        ::
+
+            >>> row_type = DataTypes.ROW([
+            ...     DataTypes.FIELD('string', DataTypes.STRING()),
+            ...     DataTypes.FIELD('int_array', 
DataTypes.ARRAY(DataTypes.INT()))
+            ... ])
+            >>> row_type_info = Types.ROW_NAMED(
+            ...     ['string', 'int_array'],
+            ...     [Types.STRING(), Types.LIST(Types.INT())]
+            ... )
+            >>> sink = FileSink.for_bulk_format(
+            ...     OUTPUT_DIR, OrcBulkWriters.for_row_type(
+            ...         row_type=row_type,
+            ...         writer_properties=Configuration(),
+            ...         hadoop_config=Configuration(),
+            ...     )
+            ... ).build()
+            >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+
+        Note that in the above example, an identity map to indicate its 
RowTypeInfo is necessary
+        before ``sink_to`` when ``ds`` is a source stream producing 
**RowData** records,
+        because RowDataBulkWriterFactory assumes the input record type is Row.
+        """
+        if not isinstance(row_type, RowType):
+            raise TypeError('row_type must be an instance of RowType')
+
+        j_data_type = _to_java_data_type(row_type)
+        jvm = get_gateway().jvm
+        j_row_type = j_data_type.getLogicalType()
+        orc_types = to_jarray(
+            jvm.org.apache.flink.table.types.logical.LogicalType,
+            [i for i in j_row_type.getChildren()]
+        )
+        type_description = jvm.org.apache.flink.orc \
+            .OrcSplitReaderUtil.logicalTypeToOrcType(j_row_type)
+        if writer_properties is None:
+            writer_properties = Configuration()
+        if hadoop_config is None:
+            hadoop_config = Configuration()
+
+        return RowDataBulkWriterFactory(
+            jvm.org.apache.flink.orc.writer.OrcBulkWriterFactory(
+                jvm.org.apache.flink.orc.vector.RowDataVectorizer(
+                    type_description.toString(),
+                    orc_types
+                ),
+                create_java_properties(writer_properties),
+                create_hadoop_configuration(hadoop_config)
+            ),
+            row_type
+        )
diff --git a/flink-python/pyflink/datastream/formats/parquet.py 
b/flink-python/pyflink/datastream/formats/parquet.py
index 687bc35aa4d..5d1164b82b8 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -74,8 +74,8 @@ class AvroParquetReaders(object):
 
 class AvroParquetWriters(object):
     """
-    Convenient builder to create ParquetWriterFactory instances for Avro 
types. Only GenericRecord
-    is supported in PyFlink.
+    Convenient builder to create Parquet BulkWriterFactory instances for Avro 
types.
+    Only GenericRecord is supported at present.
 
     .. versionadded:: 1.16.0
     """
diff --git a/flink-python/pyflink/datastream/formats/tests/test_orc.py 
b/flink-python/pyflink/datastream/formats/tests/test_orc.py
new file mode 100644
index 00000000000..77ce3b085de
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/tests/test_orc.py
@@ -0,0 +1,156 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import glob
+import os
+import tempfile
+import unittest
+from datetime import date, datetime
+from decimal import Decimal
+from typing import List, Optional, Tuple
+
+import pandas as pd
+
+from pyflink.common import Row
+from pyflink.common.typeinfo import RowTypeInfo, Types
+from pyflink.datastream import DataStream
+from pyflink.datastream.connectors.file_system import FileSink
+from pyflink.datastream.formats.orc import OrcBulkWriters
+from pyflink.datastream.formats.tests.test_parquet import 
_create_parquet_array_row_and_data, \
+    _check_parquet_array_results, _create_parquet_map_row_and_data, 
_check_parquet_map_results
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import RowType, DataTypes
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, 
to_java_data_structure
+
+
+@unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None,
+                 'Some Hadoop lib is needed for Orc format tests')
+class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
+
+    def setUp(self):
+        super().setUp()
+        self.env.set_parallelism(1)
+        self.orc_dir_name = tempfile.mkdtemp(dir=self.tempdir)
+
+    def test_orc_basic_write(self):
+        row_type, row_type_info, data = _create_orc_basic_row_and_data()
+        self._build_orc_job(row_type, row_type_info, data)
+        self.env.execute('test_orc_basic_write')
+        results = self._read_orc_file()
+        _check_orc_basic_results(self, results)
+
+    def test_orc_array_write(self):
+        (
+            row_type,
+            row_type_info,
+            conversion_row_type_info,
+            data,
+        ) = _create_parquet_array_row_and_data()
+        self._build_orc_job(row_type, row_type_info, data, 
conversion_row_type_info)
+        self.env.execute()
+        results = self._read_orc_file()
+        _check_parquet_array_results(self, results)
+
+    def test_orc_map_write(self):
+        row_type, row_type_info, data = _create_parquet_map_row_and_data()
+        self._build_orc_job(row_type, row_type_info, data)
+        self.env.execute()
+        results = self._read_orc_file()
+        _check_parquet_map_results(self, results)
+
+    def _build_orc_job(
+        self,
+        row_type: RowType,
+        row_type_info: RowTypeInfo,
+        data: List[Row],
+        conversion_type_info: Optional[RowTypeInfo] = None,
+    ):
+        jvm = get_gateway().jvm
+        sink = FileSink.for_bulk_format(
+            self.orc_dir_name, OrcBulkWriters.for_row_type(row_type)
+        ).build()
+        j_list = jvm.java.util.ArrayList()
+        for d in data:
+            j_list.add(to_java_data_structure(d))
+        ds = 
DataStream(self.env._j_stream_execution_environment.fromCollection(
+            j_list,
+            row_type_info.get_java_type_info()
+        ))
+        if conversion_type_info:
+            ds = ds.map(lambda e: e, output_type=conversion_type_info)
+        ds.sink_to(sink)
+
+    def _read_orc_file(self):
+        records = []
+        for file in glob.glob(os.path.join(os.path.join(self.orc_dir_name, 
'**/*'))):
+            df = pd.read_orc(file)
+            for i in range(df.shape[0]):
+                records.append(df.loc[i])
+        return records
+
+
+def _create_orc_basic_row_and_data() -> Tuple[RowType, RowTypeInfo, List[Row]]:
+    jvm = get_gateway().jvm
+    row_type = DataTypes.ROW([
+        DataTypes.FIELD('char', DataTypes.CHAR(10)),
+        DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)),
+        DataTypes.FIELD('bytes', DataTypes.BYTES()),
+        DataTypes.FIELD('boolean', DataTypes.BOOLEAN()),
+        DataTypes.FIELD('decimal', DataTypes.DECIMAL(2, 0)),
+        DataTypes.FIELD('int', DataTypes.INT()),
+        DataTypes.FIELD('bigint', DataTypes.BIGINT()),
+        DataTypes.FIELD('double', DataTypes.DOUBLE()),
+        DataTypes.FIELD('date', DataTypes.DATE()),
+        DataTypes.FIELD('timestamp', DataTypes.TIMESTAMP(3)),
+    ])
+    row_type_info = Types.ROW_NAMED(
+        ['char', 'varchar', 'bytes', 'boolean', 'decimal', 'int', 'bigint', 
'double',
+         'date', 'timestamp'],
+        [Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()), 
Types.BOOLEAN(),
+         Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(),
+         Types.JAVA(jvm.java.time.LocalTime), 
Types.JAVA(jvm.java.time.LocalDateTime)]
+    )
+    data = [Row(
+        char='char',
+        varchar='varchar',
+        bytes=b'varbinary',
+        boolean=True,
+        decimal=Decimal(1.5),
+        int=2147483647,
+        bigint=-9223372036854775808,
+        double=2e-308,
+        date=date(1970, 1, 1),
+        timestamp=datetime(1970, 1, 2, 3, 4, 5, 600000),
+    )]
+    return row_type, row_type_info, data
+
+
+def _check_orc_basic_results(test, results):
+    row = results[0]
+    test.assertEqual(row['char'], b'char      ')
+    test.assertEqual(row['varchar'], 'varchar')
+    test.assertEqual(row['bytes'], b'varbinary')
+    test.assertEqual(row['boolean'], True)
+    test.assertAlmostEqual(row['decimal'], 2)
+    test.assertEqual(row['int'], 2147483647)
+    test.assertEqual(row['bigint'], -9223372036854775808)
+    test.assertAlmostEqual(row['double'], 2e-308, delta=1e-311)
+    test.assertEqual(row['date'], date(1970, 1, 1))
+    test.assertEqual(
+        row['timestamp'].to_pydatetime(),
+        datetime(1970, 1, 2, 3, 4, 5, 600000),
+    )
diff --git a/flink-python/pyflink/datastream/utils.py 
b/flink-python/pyflink/datastream/utils.py
index f1bdb186177..efbdd1b5da1 100644
--- a/flink-python/pyflink/datastream/utils.py
+++ b/flink-python/pyflink/datastream/utils.py
@@ -51,6 +51,14 @@ def create_hadoop_configuration(config: Configuration):
     return hadoop_config
 
 
+def create_java_properties(config: Configuration):
+    jvm = get_gateway().jvm
+    properties = jvm.java.util.Properties()
+    for k, v in config.to_dict().items():
+        properties.put(k, v)
+    return properties
+
+
 def convert_to_python_obj(data, type_info):
     if type_info == Types.PICKLED_BYTE_ARRAY():
         return pickle.loads(data)

Reply via email to