This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e16fa9f [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API e16fa9f is described below commit e16fa9fe1506ec725f5c5abb2b24afa246e17dae Author: Wei Zhong <weizhong0...@gmail.com> AuthorDate: Tue May 21 16:25:16 2019 +0800 [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API Brief change log: - Add all of the existing descriptor interfaces align Java Table API. - Add FileSystem connector and OldCsv format support. - The `schema(..)` of OldCsv will be added in FLINK-12588. This closes #8488 --- flink-python/pyflink/java_gateway.py | 1 + flink-python/pyflink/table/__init__.py | 5 + flink-python/pyflink/table/table_descriptor.py | 510 +++++++++++++++++ flink-python/pyflink/table/table_environment.py | 91 +++ .../pyflink/table/tests/test_descriptor.py | 618 +++++++++++++++++++++ .../table/tests/test_environment_completeness.py | 2 +- 6 files changed, 1226 insertions(+), 1 deletion(-) diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index ed1dc89..e5c8330 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -101,6 +101,7 @@ def launch_gateway(): java_import(gateway.jvm, "org.apache.flink.table.api.*") java_import(gateway.jvm, "org.apache.flink.table.api.java.*") java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*") + java_import(gateway.jvm, "org.apache.flink.table.descriptors.*") java_import(gateway.jvm, "org.apache.flink.table.sources.*") java_import(gateway.jvm, "org.apache.flink.table.sinks.*") java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.TypeInformation") diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index dcbc0ab..4ea3b3f 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -40,6 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink from pyflink.table.table_source import TableSource, CsvTableSource from pyflink.table.types import DataTypes from pyflink.table.window import Tumble, Session, Slide, Over +from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem __all__ = [ 'TableEnvironment', @@ -56,4 +57,8 @@ __all__ = [ 'Session', 'Slide', 'Over', + 'Rowtime', + 'Schema', + 'OldCsv', + 'FileSystem', ] diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py new file mode 100644 index 0000000..ed9a157 --- /dev/null +++ b/flink-python/pyflink/table/table_descriptor.py @@ -0,0 +1,510 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from abc import ABCMeta + +from py4j.java_gateway import get_method + +from pyflink.java_gateway import get_gateway +from pyflink.util.type_utils import to_java_type + +if sys.version >= '3': + unicode = str + +__all__ = [ + 'Rowtime', + 'Schema', + 'OldCsv', + 'FileSystem' +] + + +class Descriptor(object): + """ + Base class of the descriptors that adds a set of string-based, normalized properties for + describing DDL information. + + Typical characteristics of a descriptor are: + - descriptors have a default constructor + - descriptors themselves contain very little logic + - corresponding validators validate the correctness (goal: have a single point of validation) + + A descriptor is similar to a builder in a builder pattern, thus, mutable for building + properties. + """ + + __metaclass__ = ABCMeta + + def __init__(self, j_descriptor): + self._j_descriptor = j_descriptor + + def to_properties(self): + """ + Converts this descriptor into a dict of properties. + + :return: Dict object contains all of current properties. + """ + return dict(self._j_descriptor.toProperties()) + + +class Rowtime(Descriptor): + """ + Rowtime descriptor for describing an event time attribute in the schema. + """ + + def __init__(self): + gateway = get_gateway() + self._j_rowtime = gateway.jvm.Rowtime() + super(Rowtime, self).__init__(self._j_rowtime) + + def timestamps_from_field(self, field_name): + """ + Sets a built-in timestamp extractor that converts an existing LONG or TIMESTAMP field into + the rowtime attribute. + + :param field_name: The field to convert into a rowtime attribute. + :return: This rowtime descriptor. + """ + self._j_rowtime = self._j_rowtime.timestampsFromField(field_name) + return self + + def timestamps_from_source(self): + """ + Sets a built-in timestamp extractor that converts the assigned timestamps from a DataStream + API record into the rowtime attribute and thus preserves the assigned timestamps from the + source. + + ..note:: + This extractor only works in streaming environments. + + :return: This rowtime descriptor. + """ + self._j_rowtime = self._j_rowtime.timestampsFromSource() + return self + + def timestamps_from_extractor(self, extractor): + """ + Sets a custom timestamp extractor to be used for the rowtime attribute. + + :param extractor: The java canonical class name of the TimestampExtractor to extract the + rowtime attribute from the physical type. The TimestampExtractor must + have a public no-argument constructor and can be founded by + in current Java classloader. + :return: This rowtime descriptor. + """ + gateway = get_gateway() + self._j_rowtime = self._j_rowtime.timestampsFromExtractor( + gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(extractor) + .newInstance()) + return self + + def watermarks_periodic_ascending(self): + """ + Sets a built-in watermark strategy for ascending rowtime attributes. + + Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a + timestamp equal to the max timestamp are not late. + + :return: This rowtime descriptor. + """ + self._j_rowtime = self._j_rowtime.watermarksPeriodicAscending() + return self + + def watermarks_periodic_bounded(self, delay): + """ + Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a + bounded time interval. + + Emits watermarks which are the maximum observed timestamp minus the specified delay. + + :param delay: Delay in milliseconds. + :return: This rowtime descriptor. + """ + self._j_rowtime = self._j_rowtime.watermarksPeriodicBounded(delay) + return self + + def watermarks_from_source(self): + """ + Sets a built-in watermark strategy which indicates the watermarks should be preserved from + the underlying DataStream API and thus preserves the assigned watermarks from the source. + + :return: This rowtime descriptor. + """ + self._j_rowtime = self._j_rowtime.watermarksFromSource() + return self + + def watermarks_from_strategy(self, strategy): + """ + Sets a custom watermark strategy to be used for the rowtime attribute. + + :param strategy: The java canonical class name of the WatermarkStrategy. The + WatermarkStrategy must have a public no-argument constructor and can be + founded by in current Java classloader. + :return: This rowtime descriptor. + """ + gateway = get_gateway() + self._j_rowtime = self._j_rowtime.watermarksFromStrategy( + gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(strategy) + .newInstance()) + return self + + +class Schema(Descriptor): + """ + Describes a schema of a table. + + ..note:: + Field names are matched by the exact name by default (case sensitive). + """ + + def __init__(self): + gateway = get_gateway() + self._j_schema = gateway.jvm.Schema() + super(Schema, self).__init__(self._j_schema) + + def field(self, field_name, field_type): + """ + Adds a field with the field name and the data type or type string. Required. + This method can be called multiple times. The call order of this method defines + also the order of the fields in a row. Here is a document that introduces the type strings: + https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#type-strings + + :param field_name: The field name. + :param field_type: The data type or type string of the field. + :return: This schema object. + """ + if isinstance(field_type, (str, unicode)): + self._j_schema = self._j_schema.field(field_name, field_type) + else: + self._j_schema = self._j_schema.field(field_name, to_java_type(field_type)) + return self + + def from_origin_field(self, origin_field_name): + """ + Specifies the origin of the previously defined field. The origin field is defined by a + connector or format. + + E.g. field("myString", Types.STRING).from_origin_field("CSV_MY_STRING") + + ..note:: + Field names are matched by the exact name by default (case sensitive). + + :param origin_field_name: The origin field name. + :return: This schema object. + """ + self._j_schema = get_method(self._j_schema, "from")(origin_field_name) + return self + + def proctime(self): + """ + Specifies the previously defined field as a processing-time attribute. + + E.g. field("proctime", Types.SQL_TIMESTAMP).proctime() + + :return: This schema object. + """ + self._j_schema = self._j_schema.proctime() + return self + + def rowtime(self, rowtime): + """ + Specifies the previously defined field as an event-time attribute. + + E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...) + + :param rowtime: A :class:`RowTime`. + :return: This schema object. + """ + self._j_schema = self._j_schema.rowtime(rowtime._j_rowtime) + return self + + +class FormatDescriptor(Descriptor): + """ + Describes the format of data. + """ + + __metaclass__ = ABCMeta + + def __init__(self, j_format_descriptor): + self._j_format_descriptor = j_format_descriptor + super(FormatDescriptor, self).__init__(self._j_format_descriptor) + + +class OldCsv(FormatDescriptor): + """ + Format descriptor for comma-separated values (CSV). + + ..note:: + This descriptor describes Flink's non-standard CSV table source/sink. In the future, the + descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv` + format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use + the old one for stream/batch filesystem operations for now. + + .. note:: + Deprecated: use the RFC-compliant `Csv` format instead when writing to Kafka. + """ + + def __init__(self): + gateway = get_gateway() + self._j_csv = gateway.jvm.OldCsv() + super(OldCsv, self).__init__(self._j_csv) + + def field_delimiter(self, delimiter): + """ + Sets the field delimiter, "," by default. + + :param delimiter: The field delimiter. + :return: This :class:`OldCsv` object. + """ + self._j_csv = self._j_csv.fieldDelimiter(delimiter) + return self + + def line_delimiter(self, delimiter): + """ + Sets the line delimiter, "\n" by default. + + :param delimiter: The line delimiter. + :return: This :class:`OldCsv` object. + """ + self._j_csv = self._j_csv.lineDelimiter(delimiter) + return self + + def field(self, field_name, field_type): + """ + Adds a format field with the field name and the data type or type string. Required. + This method can be called multiple times. The call order of this method defines + also the order of the fields in the format. + + :param field_name: The field name. + :param field_type: The data type or type string of the field. + :return: This :class:`OldCsv` object. + """ + if isinstance(field_type, (str, unicode)): + self._j_csv = self._j_csv.field(field_name, field_type) + else: + self._j_csv = self._j_csv.field(field_name, to_java_type(field_type)) + return self + + def quote_character(self, quote_character): + """ + Sets a quote character for String values, null by default. + + :param quote_character: The quote character. + :return: This :class:`OldCsv` object. + """ + self._j_csv = self._j_csv.quoteCharacter(quote_character) + return self + + def comment_prefix(self, prefix): + """ + Sets a prefix to indicate comments, null by default. + + :param prefix: The prefix to indicate comments. + :return: This :class:`OldCsv` object. + """ + self._j_csv = self._j_csv.commentPrefix(prefix) + return self + + def ignore_parse_errors(self): + """ + Skip records with parse error instead to fail. Throw an exception by default. + + :return: This :class:`OldCsv` object. + """ + self._j_csv = self._j_csv.ignoreParseErrors() + return self + + def ignore_first_line(self): + """ + Ignore the first line. Not skip the first line by default. + + :return: This :class:`OldCsv` object. + """ + self._j_csv = self._j_csv.ignoreFirstLine() + return self + + +class ConnectorDescriptor(Descriptor): + """ + Describes a connector to an other system. + """ + + __metaclass__ = ABCMeta + + def __init__(self, j_connector_descriptor): + self._j_connector_descriptor = j_connector_descriptor + super(ConnectorDescriptor, self).__init__(self._j_connector_descriptor) + + +class FileSystem(ConnectorDescriptor): + """ + Connector descriptor for a file system. + """ + + def __init__(self): + gateway = get_gateway() + self._j_file_system = gateway.jvm.FileSystem() + super(FileSystem, self).__init__(self._j_file_system) + + def path(self, path_str): + """ + Sets the path to a file or directory in a file system. + + :param path_str: The path of a file or directory. + :return: This :class:`FileSystem` object. + """ + self._j_file_system = self._j_file_system.path(path_str) + return self + + +class ConnectTableDescriptor(Descriptor): + """ + Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`. + """ + + __metaclass__ = ABCMeta + + def __init__(self, j_table_descriptor): + self._j_table_descriptor = j_table_descriptor + super(ConnectTableDescriptor, self).__init__(self._j_table_descriptor) + + def with_format(self, format_descriptor): + """ + Specifies the format that defines how to read data from a connector. + + :type format_descriptor: The :class:`FormatDescriptor` for the resulting table, + e.g. :class:`OldCsv`. + :return: This object. + """ + self._j_table_descriptor = \ + self._j_table_descriptor.withFormat(format_descriptor._j_format_descriptor) + return self + + def with_schema(self, schema): + """ + Specifies the resulting table schema. + + :type schema: The :class:`Schema` object for the resulting table. + :return: This object. + """ + self._j_table_descriptor = self._j_table_descriptor.withSchema(schema._j_schema) + return self + + def register_table_sink(self, name): + """ + Searches for the specified table sink, configures it accordingly, and registers it as + a table under the given name. + + :param name: Table name to be registered in the table environment. + :return: This object. + """ + self._j_table_descriptor = self._j_table_descriptor.registerTableSink(name) + return self + + def register_table_source(self, name): + """ + Searches for the specified table source, configures it accordingly, and registers it as + a table under the given name. + + :param name: Table name to be registered in the table environment. + :return: This object. + """ + self._j_table_descriptor = self._j_table_descriptor.registerTableSource(name) + return self + + def register_table_source_and_sink(self, name): + """ + Searches for the specified table source and sink, configures them accordingly, and + registers them as a table under the given name. + + :param name: Table name to be registered in the table environment. + :return: This object. + """ + self._j_table_descriptor = self._j_table_descriptor.registerTableSourceAndSink(name) + return self + + +class StreamTableDescriptor(ConnectTableDescriptor): + """ + Descriptor for specifying a table source and/or sink in a streaming environment. + """ + + def __init__(self, j_stream_table_descriptor): + self._j_stream_table_descriptor = j_stream_table_descriptor + super(StreamTableDescriptor, self).__init__(self._j_stream_table_descriptor) + + def in_append_mode(self): + """ + Declares how to perform the conversion between a dynamic table and an external connector. + + In append mode, a dynamic table and an external connector only exchange INSERT messages. + + :return: This object. + """ + self._j_stream_table_descriptor = self._j_stream_table_descriptor.inAppendMode() + return self + + def in_retract_mode(self): + """ + Declares how to perform the conversion between a dynamic table and an external connector. + + In retract mode, a dynamic table and an external connector exchange ADD and RETRACT + messages. + + An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an + UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for + the updating (new) row. + + In this mode, a key must not be defined as opposed to upsert mode. However, every update + consists of two messages which is less efficient. + + :return: This object. + """ + self._j_stream_table_descriptor = self._j_stream_table_descriptor.inRetractMode() + return self + + def in_upsert_mode(self): + """ + Declares how to perform the conversion between a dynamic table and an external connector. + + In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE + messages. + + This mode requires a (possibly composite) unique key by which updates can be propagated. The + external connector needs to be aware of the unique key attribute in order to apply messages + correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as + DELETE messages. + + The main difference to a retract stream is that UPDATE changes are encoded with a single + message and are therefore more efficient. + + :return: This object. + """ + self._j_stream_table_descriptor = self._j_stream_table_descriptor.inUpsertMode() + return self + + +class BatchTableDescriptor(ConnectTableDescriptor): + """ + Descriptor for specifying a table source and/or sink in a batch environment. + """ + + def __init__(self, j_batch_table_descriptor): + self.j_batch_table_descriptor = j_batch_table_descriptor + super(BatchTableDescriptor, self).__init__(self.j_batch_table_descriptor) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 5b9886d..dc77b04 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -20,6 +20,8 @@ from abc import ABCMeta, abstractmethod from pyflink.table.query_config import StreamQueryConfig, BatchQueryConfig, QueryConfig from pyflink.table.table_config import TableConfig +from pyflink.table.table_descriptor import (StreamTableDescriptor, ConnectorDescriptor, + BatchTableDescriptor) from pyflink.java_gateway import get_gateway from pyflink.table import Table @@ -207,6 +209,35 @@ class TableEnvironment(object): def query_config(self): pass + @abstractmethod + def connect(self, connector_descriptor): + """ + Creates a table source and/or table sink from a descriptor. + + Descriptors allow for declaring the communication to external systems in an + implementation-agnostic way. The classpath is scanned for suitable table factories that + match the desired configuration. + + The following example shows how to read from a connector using a JSON format and + registering a table source as "MyTable": + :: + >>> table_env\ + ... .connect(ExternalSystemXYZ() + ... .version("0.11"))\ + ... .with_format(Json() + ... .json_schema("{...}") + ... .fail_on_missing_field(False))\ + ... .with_schema(Schema() + ... .field("user-name", "VARCHAR") + ... .from_origin_field("u_name") + ... .field("count", "DECIMAL"))\ + ... .register_table_source("MyTable") + + :param connector_descriptor: Connector descriptor describing the external system. + :return: A :class:`ConnectTableDescriptor` used to build the table source/sink. + """ + pass + @classmethod def create(cls, table_config): """ @@ -261,6 +292,36 @@ class StreamTableEnvironment(TableEnvironment): """ return StreamQueryConfig(self._j_tenv.queryConfig()) + def connect(self, connector_descriptor): + """ + Creates a table source and/or table sink from a descriptor. + + Descriptors allow for declaring the communication to external systems in an + implementation-agnostic way. The classpath is scanned for suitable table factories that + match the desired configuration. + + The following example shows how to read from a connector using a JSON format and + registering a table source as "MyTable": + :: + >>> table_env\ + ... .connect(ExternalSystemXYZ() + ... .version("0.11"))\ + ... .with_format(Json() + ... .json_schema("{...}") + ... .fail_on_missing_field(False))\ + ... .with_schema(Schema() + ... .field("user-name", "VARCHAR") + ... .from_origin_field("u_name") + ... .field("count", "DECIMAL"))\ + ... .register_table_source("MyTable") + + :param connector_descriptor: Connector descriptor describing the external system. + :return: A :class:`StreamTableDescriptor` used to build the table source/sink. + """ + # type: (ConnectorDescriptor) -> StreamTableDescriptor + return StreamTableDescriptor( + self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) + class BatchTableEnvironment(TableEnvironment): @@ -288,3 +349,33 @@ class BatchTableEnvironment(TableEnvironment): :return: A new :class:`BatchQueryConfig`. """ return BatchQueryConfig(self._j_tenv.queryConfig()) + + def connect(self, connector_descriptor): + """ + Creates a table source and/or table sink from a descriptor. + + Descriptors allow for declaring the communication to external systems in an + implementation-agnostic way. The classpath is scanned for suitable table factories that + match the desired configuration. + + The following example shows how to read from a connector using a JSON format and + registering a table source as "MyTable": + :: + >>> table_env\ + ... .connect(ExternalSystemXYZ() + ... .version("0.11"))\ + ... .with_format(Json() + ... .json_schema("{...}") + ... .fail_on_missing_field(False))\ + ... .with_schema(Schema() + ... .field("user-name", "VARCHAR") + ... .from_origin_field("u_name") + ... .field("count", "DECIMAL"))\ + ... .register_table_source("MyTable") + + :param connector_descriptor: Connector descriptor describing the external system. + :return: A :class:`BatchTableDescriptor` used to build the table source/sink. + """ + # type: (ConnectorDescriptor) -> BatchTableDescriptor + return BatchTableDescriptor( + self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py new file mode 100644 index 0000000..f2c2976 --- /dev/null +++ b/flink-python/pyflink/table/tests/test_descriptor.py @@ -0,0 +1,618 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema) +from pyflink.table.table_sink import CsvTableSink +from pyflink.table.types import DataTypes +from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase, + PyFlinkBatchTableTestCase) + + +class FileSystemDescriptorTests(PyFlinkTestCase): + + def test_path(self): + file_system = FileSystem() + + file_system.path("/test.csv") + + properties = file_system.to_properties() + expected = {'connector.property-version': '1', + 'connector.type': 'filesystem', + 'connector.path': '/test.csv'} + assert properties == expected + + +class OldCsvDescriptorTests(PyFlinkTestCase): + + def test_field_delimiter(self): + csv = OldCsv() + + csv.field_delimiter("|") + + properties = csv.to_properties() + expected = {'format.field-delimiter': '|', + 'format.type': 'csv', + 'format.property-version': '1'} + assert properties == expected + + def test_line_delimiter(self): + csv = OldCsv() + + csv.line_delimiter(";") + + expected = {'format.type': 'csv', + 'format.property-version': '1', + 'format.line-delimiter': ';'} + + properties = csv.to_properties() + assert properties == expected + + def test_ignore_parse_errors(self): + csv = OldCsv() + + csv.ignore_parse_errors() + + properties = csv.to_properties() + expected = {'format.ignore-parse-errors': 'true', + 'format.type': 'csv', + 'format.property-version': '1'} + assert properties == expected + + def test_quote_character(self): + csv = OldCsv() + + csv.quote_character("*") + + properties = csv.to_properties() + expected = {'format.quote-character': '*', + 'format.type': 'csv', + 'format.property-version': '1'} + assert properties == expected + + def test_comment_prefix(self): + csv = OldCsv() + + csv.comment_prefix("#") + + properties = csv.to_properties() + expected = {'format.comment-prefix': '#', + 'format.type': 'csv', + 'format.property-version': '1'} + assert properties == expected + + def test_ignore_first_line(self): + csv = OldCsv() + + csv.ignore_first_line() + + properties = csv.to_properties() + expected = {'format.ignore-first-line': 'true', + 'format.type': 'csv', + 'format.property-version': '1'} + assert properties == expected + + def test_field(self): + csv = OldCsv() + + csv.field("a", DataTypes.LONG) + csv.field("b", DataTypes.STRING) + csv.field("c", "SQL_TIMESTAMP") + + properties = csv.to_properties() + expected = {'format.fields.0.name': 'a', + 'format.fields.0.type': 'BIGINT', + 'format.fields.1.name': 'b', + 'format.fields.1.type': 'VARCHAR', + 'format.fields.2.name': 'c', + 'format.fields.2.type': 'SQL_TIMESTAMP', + 'format.type': 'csv', + 'format.property-version': '1'} + assert properties == expected + + +class RowTimeDescriptorTests(PyFlinkTestCase): + + def test_timestamps_from_field(self): + rowtime = Rowtime() + + rowtime = rowtime.timestamps_from_field("rtime") + + properties = rowtime.to_properties() + expect = {'rowtime.timestamps.type': 'from-field', 'rowtime.timestamps.from': 'rtime'} + assert properties == expect + + def test_timestamps_from_source(self): + rowtime = Rowtime() + + rowtime = rowtime.timestamps_from_source() + + properties = rowtime.to_properties() + expect = {'rowtime.timestamps.type': 'from-source'} + assert properties == expect + + def test_timestamps_from_extractor(self): + rowtime = Rowtime() + + rowtime = rowtime.timestamps_from_extractor( + "org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor") + + properties = rowtime.to_properties() + expect = {'rowtime.timestamps.type': 'custom', + 'rowtime.timestamps.class': + 'org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor', + 'rowtime.timestamps.serialized': + 'rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R' + 'vbUV4dHJhY3RvcoaChjMg55xwAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLm' + 'FwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1' + 'Y6piFNsGAIAAHhwdAACdHM'} + assert properties == expect + + def test_watermarks_periodic_ascending(self): + rowtime = Rowtime() + + rowtime = rowtime.watermarks_periodic_ascending() + + properties = rowtime.to_properties() + expect = {'rowtime.watermarks.type': 'periodic-ascending'} + assert properties == expect + + def test_watermarks_periodic_bounded(self): + rowtime = Rowtime() + + rowtime = rowtime.watermarks_periodic_bounded(1000) + + properties = rowtime.to_properties() + expect = {'rowtime.watermarks.type': 'periodic-bounded', + 'rowtime.watermarks.delay': '1000'} + assert properties == expect + + def test_watermarks_from_source(self): + rowtime = Rowtime() + + rowtime = rowtime.watermarks_from_source() + + properties = rowtime.to_properties() + expect = {'rowtime.watermarks.type': 'from-source'} + assert properties == expect + + def test_watermarks_from_strategy(self): + rowtime = Rowtime() + + rowtime = rowtime.watermarks_from_strategy( + "org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner") + + properties = rowtime.to_properties() + expect = {'rowtime.watermarks.type': 'custom', + 'rowtime.watermarks.class': + 'org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner', + 'rowtime.watermarks.serialized': + 'rO0ABXNyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R' + 'vbUFzc2lnbmVyeDcuDvfbu0kCAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3' + 'RyYXRlZ2llcy5QdW5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY' + '2hlLmZsaW5rLnRhYmxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OW' + 'aT4CAAB4cA'} + assert properties == expect + + +class SchemaDescriptorTests(PyFlinkTestCase): + + def test_field(self): + schema = Schema() + + schema = schema\ + .field("int_field", DataTypes.INT)\ + .field("long_field", DataTypes.LONG)\ + .field("string_field", DataTypes.STRING)\ + .field("timestamp_field", DataTypes.TIMESTAMP)\ + .field("time_field", DataTypes.TIME)\ + .field("date_field", DataTypes.DATE)\ + .field("double_field", DataTypes.DOUBLE)\ + .field("float_field", DataTypes.FLOAT)\ + .field("byte_field", DataTypes.BYTE)\ + .field("short_field", DataTypes.SHORT)\ + .field("boolean_field", DataTypes.BOOLEAN) + + properties = schema.to_properties() + expected = {'schema.0.name': 'int_field', + 'schema.0.type': 'INT', + 'schema.1.name': 'long_field', + 'schema.1.type': 'BIGINT', + 'schema.2.name': 'string_field', + 'schema.2.type': 'VARCHAR', + 'schema.3.name': 'timestamp_field', + 'schema.3.type': 'TIMESTAMP', + 'schema.4.name': 'time_field', + 'schema.4.type': 'TIME', + 'schema.5.name': 'date_field', + 'schema.5.type': 'DATE', + 'schema.6.name': 'double_field', + 'schema.6.type': 'DOUBLE', + 'schema.7.name': 'float_field', + 'schema.7.type': 'FLOAT', + 'schema.8.name': 'byte_field', + 'schema.8.type': 'TINYINT', + 'schema.9.name': 'short_field', + 'schema.9.type': 'SMALLINT', + 'schema.10.name': 'boolean_field', + 'schema.10.type': 'BOOLEAN'} + assert properties == expected + + def test_field_in_string(self): + schema = Schema() + + schema = schema\ + .field("int_field", 'INT')\ + .field("long_field", 'BIGINT')\ + .field("string_field", 'VARCHAR')\ + .field("timestamp_field", 'SQL_TIMESTAMP')\ + .field("time_field", 'SQL_TIME')\ + .field("date_field", 'SQL_DATE')\ + .field("double_field", 'DOUBLE')\ + .field("float_field", 'FLOAT')\ + .field("byte_field", 'TINYINT')\ + .field("short_field", 'SMALLINT')\ + .field("boolean_field", 'BOOLEAN') + + properties = schema.to_properties() + expected = {'schema.0.name': 'int_field', + 'schema.0.type': 'INT', + 'schema.1.name': 'long_field', + 'schema.1.type': 'BIGINT', + 'schema.2.name': 'string_field', + 'schema.2.type': 'VARCHAR', + 'schema.3.name': 'timestamp_field', + 'schema.3.type': 'SQL_TIMESTAMP', + 'schema.4.name': 'time_field', + 'schema.4.type': 'SQL_TIME', + 'schema.5.name': 'date_field', + 'schema.5.type': 'SQL_DATE', + 'schema.6.name': 'double_field', + 'schema.6.type': 'DOUBLE', + 'schema.7.name': 'float_field', + 'schema.7.type': 'FLOAT', + 'schema.8.name': 'byte_field', + 'schema.8.type': 'TINYINT', + 'schema.9.name': 'short_field', + 'schema.9.type': 'SMALLINT', + 'schema.10.name': 'boolean_field', + 'schema.10.type': 'BOOLEAN'} + assert properties == expected + + def test_from_origin_field(self): + schema = Schema() + + schema = schema\ + .field("int_field", DataTypes.INT)\ + .field("long_field", DataTypes.LONG).from_origin_field("origin_field_a")\ + .field("string_field", DataTypes.STRING) + + properties = schema.to_properties() + expected = {'schema.0.name': 'int_field', + 'schema.0.type': 'INT', + 'schema.1.name': 'long_field', + 'schema.1.type': 'BIGINT', + 'schema.1.from': 'origin_field_a', + 'schema.2.name': 'string_field', + 'schema.2.type': 'VARCHAR'} + assert properties == expected + + def test_proctime(self): + schema = Schema() + + schema = schema\ + .field("int_field", DataTypes.INT)\ + .field("ptime", DataTypes.LONG).proctime()\ + .field("string_field", DataTypes.STRING) + + properties = schema.to_properties() + expected = {'schema.0.name': 'int_field', + 'schema.0.type': 'INT', + 'schema.1.name': 'ptime', + 'schema.1.type': 'BIGINT', + 'schema.1.proctime': 'true', + 'schema.2.name': 'string_field', + 'schema.2.type': 'VARCHAR'} + assert properties == expected + + def test_rowtime(self): + schema = Schema() + + schema = schema\ + .field("int_field", DataTypes.INT)\ + .field("long_field", DataTypes.LONG)\ + .field("rtime", DataTypes.LONG)\ + .rowtime( + Rowtime().timestamps_from_field("long_field").watermarks_periodic_bounded(5000))\ + .field("string_field", DataTypes.STRING) + + properties = schema.to_properties() + print(properties) + expected = {'schema.0.name': 'int_field', + 'schema.0.type': 'INT', + 'schema.1.name': 'long_field', + 'schema.1.type': 'BIGINT', + 'schema.2.name': 'rtime', + 'schema.2.type': 'BIGINT', + 'schema.2.rowtime.timestamps.type': 'from-field', + 'schema.2.rowtime.timestamps.from': 'long_field', + 'schema.2.rowtime.watermarks.type': 'periodic-bounded', + 'schema.2.rowtime.watermarks.delay': '5000', + 'schema.3.name': 'string_field', + 'schema.3.type': 'VARCHAR'} + assert properties == expected + + +class AbstractTableDescriptorTests(object): + + def test_with_format(self): + descriptor = self.t_env.connect(FileSystem()) + + descriptor.with_format(OldCsv().field("a", "INT")) + + properties = descriptor.to_properties() + + expected = {'format.type': 'csv', + 'format.property-version': '1', + 'format.fields.0.name': 'a', + 'format.fields.0.type': 'INT', + 'connector.property-version': '1', + 'connector.type': 'filesystem'} + assert properties == expected + + def test_with_schema(self): + descriptor = self.t_env.connect(FileSystem()) + + descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT")) + + properties = descriptor.to_properties() + expected = {'schema.0.name': 'a', + 'schema.0.type': 'INT', + 'format.type': 'csv', + 'format.property-version': '1', + 'connector.type': 'filesystem', + 'connector.property-version': '1'} + assert properties == expected + + def test_register_table_sink(self): + source_path = os.path.join(self.tempdir + '/streaming.csv') + field_names = ["a", "b", "c"] + field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING] + data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")] + csv_source = self.prepare_csv_source(source_path, data, field_types, field_names) + t_env = self.t_env + t_env.register_table_source("source", csv_source) + # connect sink + sink_path = os.path.join(self.tempdir + '/streaming2.csv') + if os.path.isfile(sink_path): + os.remove(sink_path) + + t_env.connect(FileSystem().path(sink_path))\ + .with_format(OldCsv() + .field_delimiter(',') + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .with_schema(Schema() + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .register_table_sink("sink") + t_env.scan("source") \ + .select("a + 1, b, c") \ + .insert_into("sink") + t_env.execute() + + with open(sink_path, 'r') as f: + lines = f.read() + assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n" + + def test_register_table_source(self): + source_path = os.path.join(self.tempdir + '/streaming.csv') + field_names = ["a", "b", "c"] + field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING] + data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")] + self.prepare_csv_source(source_path, data, field_types, field_names) + t_env = self.t_env + sink_path = os.path.join(self.tempdir + '/streaming2.csv') + if os.path.isfile(sink_path): + os.remove(sink_path) + t_env.register_table_sink( + "sink", + field_names, field_types, CsvTableSink(sink_path)) + + # connect source + t_env.connect(FileSystem().path(source_path))\ + .with_format(OldCsv() + .field_delimiter(',') + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .with_schema(Schema() + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .register_table_source("source") + t_env.scan("source") \ + .select("a + 1, b, c") \ + .insert_into("sink") + t_env.execute() + + with open(sink_path, 'r') as f: + lines = f.read() + assert lines == '2,Hi,Hello\n' + '3,Hello,Hello\n' + + def test_register_table_source_and_sink(self): + source_path = os.path.join(self.tempdir + '/streaming.csv') + field_names = ["a", "b", "c"] + field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING] + data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")] + self.prepare_csv_source(source_path, data, field_types, field_names) + sink_path = os.path.join(self.tempdir + '/streaming2.csv') + if os.path.isfile(sink_path): + os.remove(sink_path) + t_env = self.t_env + + t_env.connect(FileSystem().path(source_path))\ + .with_format(OldCsv() + .field_delimiter(',') + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .with_schema(Schema() + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .register_table_source_and_sink("source") + t_env.connect(FileSystem().path(sink_path))\ + .with_format(OldCsv() + .field_delimiter(',') + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .with_schema(Schema() + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .register_table_source_and_sink("sink") + t_env.scan("source") \ + .select("a + 1, b, c") \ + .insert_into("sink") + t_env.execute() + + with open(sink_path, 'r') as f: + lines = f.read() + assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n" + + +class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescriptorTests): + + def test_in_append_mode(self): + descriptor = self.t_env.connect(FileSystem()) + + descriptor\ + .with_format(OldCsv())\ + .in_append_mode() + + properties = descriptor.to_properties() + expected = {'update-mode': 'append', + 'format.type': 'csv', + 'format.property-version': '1', + 'connector.property-version': '1', + 'connector.type': 'filesystem'} + assert properties == expected + + def test_in_retract_mode(self): + descriptor = self.t_env.connect(FileSystem()) + + descriptor \ + .with_format(OldCsv()) \ + .in_retract_mode() + + properties = descriptor.to_properties() + expected = {'update-mode': 'retract', + 'format.type': 'csv', + 'format.property-version': '1', + 'connector.property-version': '1', + 'connector.type': 'filesystem'} + assert properties == expected + + def test_in_upsert_mode(self): + descriptor = self.t_env.connect(FileSystem()) + + descriptor \ + .with_format(OldCsv()) \ + .in_upsert_mode() + + properties = descriptor.to_properties() + expected = {'update-mode': 'upsert', + 'format.type': 'csv', + 'format.property-version': '1', + 'connector.property-version': '1', + 'connector.type': 'filesystem'} + assert properties == expected + + +class BatchTableDescriptorTests(PyFlinkBatchTableTestCase, AbstractTableDescriptorTests): + pass + + +class StreamDescriptorEndToEndTests(PyFlinkStreamTableTestCase): + + def test_end_to_end(self): + source_path = os.path.join(self.tempdir + '/streaming.csv') + with open(source_path, 'w') as f: + lines = 'a,b,c\n' + \ + '1,hi,hello\n' + \ + '#comments\n' + \ + "error line\n" + \ + '2,"hi,world!",hello\n' + f.write(lines) + f.close() + sink_path = os.path.join(self.tempdir + '/streaming2.csv') + t_env = self.t_env + # connect source + t_env.connect(FileSystem().path(source_path))\ + .with_format(OldCsv() + .field_delimiter(',') + .line_delimiter("\n") + .ignore_parse_errors() + .quote_character('"') + .comment_prefix("#") + .ignore_first_line() + .field("a", "INT") + .field("b", "VARCHAR") + .field("c", "VARCHAR"))\ + .with_schema(Schema() + .field("a", "INT") + .field("b", "VARCHAR") + .field("c", "VARCHAR"))\ + .in_append_mode()\ + .register_table_source("source") + # connect sink + t_env.connect(FileSystem().path(sink_path))\ + .with_format(OldCsv() + .field_delimiter(',') + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .with_schema(Schema() + .field("a", DataTypes.INT) + .field("b", DataTypes.STRING) + .field("c", DataTypes.STRING))\ + .register_table_sink("sink") + + t_env.scan("source") \ + .select("a + 1, b, c") \ + .insert_into("sink") + t_env.execute() + + with open(sink_path, 'r') as f: + lines = f.read() + assert lines == '2,hi,hello\n' + '3,hi,world!,hello\n' + + +if __name__ == '__main__': + import unittest + + try: + import xmlrunner + testRunner = xmlrunner.XMLTestRunner(output='target/test-reports') + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py index d0bfaf8..c88b985 100644 --- a/flink-python/pyflink/table/tests/test_environment_completeness.py +++ b/flink-python/pyflink/table/tests/test_environment_completeness.py @@ -41,7 +41,7 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te # registerExternalCatalog, getRegisteredExternalCatalog and listTables # should be supported when catalog supported in python. # getCompletionHints has been deprecated. It will be removed in the next release. - return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'connect', + return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables', 'getCompletionHints'}