This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 80d7ba4 [FLINK-12767][python] Support user defined connectors/format 80d7ba4 is described below commit 80d7ba440bd7389e76536e3b91ca227f4f850a67 Author: Dian Fu <fudian...@alibaba-inc.com> AuthorDate: Mon Jun 10 12:20:14 2019 +0800 [FLINK-12767][python] Support user defined connectors/format This closes #8719 --- flink-python/pom.xml | 6 ++ flink-python/pyflink/java_gateway.py | 2 + flink-python/pyflink/table/descriptors.py | 112 ++++++++++++++++++++- .../pyflink/table/tests/test_descriptor.py | 35 ++++++- .../python/CustomConnectorDescriptor.java | 72 +++++++++++++ .../descriptors/python/CustomFormatDescriptor.java | 71 +++++++++++++ 6 files changed, 296 insertions(+), 2 deletions(-) diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 276b890..ad4929f 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -56,6 +56,12 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> <!-- Python API dependencies --> diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index bf8ad76..9bb0b62 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -119,6 +119,8 @@ def import_flink_view(gateway): java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*") java_import(gateway.jvm, "org.apache.flink.table.catalog.*") java_import(gateway.jvm, "org.apache.flink.table.descriptors.*") + java_import(gateway.jvm, "org.apache.flink.table.descriptors.python.*") + java_import(gateway.jvm, "org.apache.flink.table.sources.*") java_import(gateway.jvm, "org.apache.flink.table.sinks.*") java_import(gateway.jvm, "org.apache.flink.table.sources.*") java_import(gateway.jvm, "org.apache.flink.table.types.*") diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py index cb8fc7b..457fc4f 100644 --- a/flink-python/pyflink/table/descriptors.py +++ b/flink-python/pyflink/table/descriptors.py @@ -24,6 +24,7 @@ from pyflink.table.types import _to_java_type from pyflink.java_gateway import get_gateway if sys.version >= '3': + long = int unicode = str __all__ = [ @@ -39,7 +40,9 @@ __all__ = [ 'FileSystem', 'ConnectTableDescriptor', 'StreamTableDescriptor', - 'BatchTableDescriptor' + 'BatchTableDescriptor', + 'CustomConnectorDescriptor', + 'CustomFormatDescriptor' ] @@ -611,6 +614,58 @@ class Json(FormatDescriptor): return self +class CustomFormatDescriptor(FormatDescriptor): + """ + Describes the custom format of data. + """ + + def __init__(self, type, version): + """ + Constructs a :class:`CustomFormatDescriptor`. + + :param type: String that identifies this format. + :param version: Property version for backwards compatibility. + """ + + if not isinstance(type, (str, unicode)): + raise TypeError("type must be of type str.") + if not isinstance(version, (int, long)): + raise TypeError("version must be of type int.") + gateway = get_gateway() + super(CustomFormatDescriptor, self).__init__( + gateway.jvm.CustomFormatDescriptor(type, version)) + + def property(self, key, value): + """ + Adds a configuration property for the format. + + :param key: The property key to be set. + :param value: The property value to be set. + :return: This object. + """ + + if not isinstance(key, (str, unicode)): + raise TypeError("key must be of type str.") + if not isinstance(value, (str, unicode)): + raise TypeError("value must be of type str.") + self._j_format_descriptor = self._j_format_descriptor.property(key, value) + return self + + def properties(self, property_dict): + """ + Adds a set of properties for the format. + + :param property_dict: The dict object contains configuration properties for the format. + Both the keys and values should be strings. + :return: This object. + """ + + if not isinstance(property_dict, dict): + raise TypeError("property_dict must be of type dict.") + self._j_format_descriptor = self._j_format_descriptor.properties(property_dict) + return self + + class ConnectorDescriptor(Descriptor): """ Describes a connector to an other system. @@ -1126,6 +1181,61 @@ class Elasticsearch(ConnectorDescriptor): return self +class CustomConnectorDescriptor(ConnectorDescriptor): + """ + Describes a custom connector to an other system. + """ + + def __init__(self, type, version, format_needed): + """ + Constructs a :class:`CustomConnectorDescriptor`. + + :param type: String that identifies this connector. + :param version: Property version for backwards compatibility. + :param format_needed: Flag for basic validation of a needed format descriptor. + """ + + if not isinstance(type, (str, unicode)): + raise TypeError("type must be of type str.") + if not isinstance(version, (int, long)): + raise TypeError("version must be of type int.") + if not isinstance(format_needed, bool): + raise TypeError("format_needed must be of type bool.") + gateway = get_gateway() + super(CustomConnectorDescriptor, self).__init__( + gateway.jvm.CustomConnectorDescriptor(type, version, format_needed)) + + def property(self, key, value): + """ + Adds a configuration property for the connector. + + :param key: The property key to be set. + :param value: The property value to be set. + :return: This object. + """ + + if not isinstance(key, (str, unicode)): + raise TypeError("key must be of type str.") + if not isinstance(value, (str, unicode)): + raise TypeError("value must be of type str.") + self._j_connector_descriptor = self._j_connector_descriptor.property(key, value) + return self + + def properties(self, property_dict): + """ + Adds a set of properties for the connector. + + :param property_dict: The dict object contains configuration properties for the connector. + Both the keys and values should be strings. + :return: This object. + """ + + if not isinstance(property_dict, dict): + raise TypeError("property_dict must be of type dict.") + self._j_connector_descriptor = self._j_connector_descriptor.properties(property_dict) + return self + + class ConnectTableDescriptor(Descriptor): """ Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`. diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py index 1f313a7..1c0f36e 100644 --- a/flink-python/pyflink/table/tests/test_descriptor.py +++ b/flink-python/pyflink/table/tests/test_descriptor.py @@ -18,7 +18,8 @@ import os from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafka, - Elasticsearch, Csv, Avro, Json) + Elasticsearch, Csv, Avro, Json, CustomConnectorDescriptor, + CustomFormatDescriptor) from pyflink.table.table_schema import TableSchema from pyflink.table.types import DataTypes from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase, @@ -354,6 +355,22 @@ class ElasticsearchDescriptorTest(PyFlinkTestCase): self.assertEqual(expected, properties) +class CustomConnectorDescriptorTests(PyFlinkTestCase): + + def test_custom_connector(self): + custom_connector = CustomConnectorDescriptor('kafka', 1, True) \ + .property('connector.topic', 'topic1')\ + .properties({'connector.version': '0.11', 'connector.startup-mode': 'earliest-offset'}) + + properties = custom_connector.to_properties() + expected = {'connector.type': 'kafka', + 'connector.property-version': '1', + 'connector.topic': 'topic1', + 'connector.version': '0.11', + 'connector.startup-mode': 'earliest-offset'} + self.assertEqual(expected, properties) + + class OldCsvDescriptorTests(PyFlinkTestCase): def test_field_delimiter(self): @@ -663,6 +680,22 @@ class JsonDescriptorTests(PyFlinkTestCase): self.assertEqual(expected, properties) +class CustomFormatDescriptorTests(PyFlinkTestCase): + + def test_custom_format_descriptor(self): + custom_format = CustomFormatDescriptor('json', 1) \ + .property('format.schema', 'ROW<a INT, b VARCHAR>') \ + .properties({'format.fail-on-missing-field': 'true'}) + + expected = {'format.fail-on-missing-field': 'true', + 'format.schema': 'ROW<a INT, b VARCHAR>', + 'format.property-version': '1', + 'format.type': 'json'} + + properties = custom_format.to_properties() + self.assertEqual(expected, properties) + + class RowTimeDescriptorTests(PyFlinkTestCase): def test_timestamps_from_field(self): diff --git a/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java new file mode 100644 index 0000000..34a3c71 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomConnectorDescriptor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.descriptors.ConnectorDescriptor; +import org.apache.flink.table.descriptors.DescriptorProperties; + +import java.util.Map; + +/** + * Describes a custom connector to an other system. + */ +@Internal +public class CustomConnectorDescriptor extends ConnectorDescriptor { + + private final DescriptorProperties properties; + + /** + * Constructs a {@link CustomConnectorDescriptor}. + * + * @param type String that identifies this connector. + * @param version Property version for backwards compatibility. + * @param formatNeeded Flag for basic validation of a needed format descriptor. + */ + public CustomConnectorDescriptor(String type, int version, boolean formatNeeded) { + super(type, version, formatNeeded); + properties = new DescriptorProperties(); + } + + /** + * Adds a configuration property for the connector. + * + * @param key The property key to be set. + * @param value The property value to be set. + */ + public CustomConnectorDescriptor property(String key, String value) { + properties.putString(key, value); + return this; + } + + /** + * Adds a set of properties for the connector. + * + * @param properties The properties to add. + */ + public CustomConnectorDescriptor properties(Map<String, String> properties) { + this.properties.putProperties(properties); + return this; + } + + @Override + protected Map<String, String> toConnectorProperties() { + return properties.asMap(); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java new file mode 100644 index 0000000..0ea17dc --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/descriptors/python/CustomFormatDescriptor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FormatDescriptor; + +import java.util.Map; + +/** + * Describes the custom format of data. + */ +@Internal +public class CustomFormatDescriptor extends FormatDescriptor { + + private final DescriptorProperties properties; + + /** + * Constructs a {@link CustomFormatDescriptor}. + * + * @param type String that identifies this format. + * @param version Property version for backwards compatibility. + */ + public CustomFormatDescriptor(String type, int version) { + super(type, version); + properties = new DescriptorProperties(); + } + + /** + * Adds a configuration property for the format. + * + * @param key The property key to be set. + * @param value The property value to be set. + */ + public CustomFormatDescriptor property(String key, String value) { + properties.putString(key, value); + return this; + } + + /** + * Adds a set of properties for the format. + * + * @param properties The properties to add. + */ + public CustomFormatDescriptor properties(Map<String, String> properties) { + this.properties.putProperties(properties); + return this; + } + + @Override + protected Map<String, String> toFormatProperties() { + return properties.asMap(); + } +}