This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 836fdff [FLINK-12440][python] Add all connector support align Java Table API. 836fdff is described below commit 836fdfff0db64ff8241f38e8dd362dd50a9d1895 Author: Wei Zhong <weizhong0...@gmail.com> AuthorDate: Fri May 24 14:45:30 2019 +0800 [FLINK-12440][python] Add all connector support align Java Table API. This closes #8531 --- .../main/flink-bin/bin/pyflink-gateway-server.sh | 2 +- flink-python/pyflink/table/__init__.py | 4 +- flink-python/pyflink/table/table_descriptor.py | 485 ++++++++++++++++++++- .../pyflink/table/tests/test_descriptor.py | 407 ++++++++++++++++- tools/travis_controller.sh | 2 + 5 files changed, 883 insertions(+), 17 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh index 026f813..9e41ad5 100644 --- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh +++ b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh @@ -63,7 +63,7 @@ if [[ -n "$FLINK_TESTING" ]]; then else FLINK_TEST_CLASSPATH="$FLINK_TEST_CLASSPATH":"$testJarFile" fi - done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d -name 'flink-*-tests.jar' -print0 | sort -z) + done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d \( -name 'flink-*-tests.jar' -o -path "${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-elasticsearch-base/target/flink*.jar" -o -path "${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar" \) -print0 | sort -z) fi exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${FLINK_TEST_CLASSPATH} ${DRIVER} ${ARGS[@]} diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index 281647f..904264e 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -40,7 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink from pyflink.table.table_source import TableSource, CsvTableSource from pyflink.table.types import DataTypes, UserDefinedType, Row from pyflink.table.window import Tumble, Session, Slide, Over -from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem +from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem, Kafka, Elasticsearch __all__ = [ 'TableEnvironment', @@ -63,4 +63,6 @@ __all__ = [ 'FileSystem', 'UserDefinedType', 'Row', + 'Kafka', + 'Elasticsearch' ] diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py index 1dfbde3..65161b4 100644 --- a/flink-python/pyflink/table/table_descriptor.py +++ b/flink-python/pyflink/table/table_descriptor.py @@ -30,7 +30,9 @@ __all__ = [ 'Rowtime', 'Schema', 'OldCsv', - 'FileSystem' + 'FileSystem', + 'Kafka', + 'Elasticsearch' ] @@ -256,7 +258,7 @@ class OldCsv(FormatDescriptor): format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use the old one for stream/batch filesystem operations for now. - .. note:: + ..note:: Deprecated: use the RFC-compliant `Csv` format instead when writing to Kafka. """ @@ -373,6 +375,485 @@ class FileSystem(ConnectorDescriptor): return self +class Kafka(ConnectorDescriptor): + """ + Connector descriptor for the Apache Kafka message queue. + """ + + def __init__(self): + gateway = get_gateway() + self._j_kafka = gateway.jvm.Kafka() + super(Kafka, self).__init__(self._j_kafka) + + def version(self, version): + """ + Sets the Kafka version to be used. + + :param version: Kafka version. E.g., "0.8", "0.11", etc. + :return: This object. + """ + if not isinstance(version, (str, unicode)): + version = str(version) + self._j_kafka = self._j_kafka.version(version) + return self + + def topic(self, topic): + """ + Sets the topic from which the table is read. + + :param topic: The topic from which the table is read. + :return: This object. + """ + self._j_kafka = self._j_kafka.topic(topic) + return self + + def properties(self, property_dict): + """ + Sets the configuration properties for the Kafka consumer. Resets previously set properties. + + :param property_dict: The dict object contains configuration properties for the Kafka + consumer. Both the keys and values should be strings. + :return: This object. + """ + gateway = get_gateway() + properties = gateway.jvm.java.util.Properties() + for key in property_dict: + properties.setProperty(key, property_dict[key]) + self._j_kafka = self._j_kafka.properties(properties) + return self + + def property(self, key, value): + """ + Adds a configuration properties for the Kafka consumer. + + :param key: Property key string for the Kafka consumer. + :param value: Property value string for the Kafka consumer. + :return: This object. + """ + self._j_kafka = self._j_kafka.property(key, value) + return self + + def start_from_earliest(self): + """ + Specifies the consumer to start reading from the earliest offset for all partitions. + This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + + :return: This object. + """ + self._j_kafka = self._j_kafka.startFromEarliest() + return self + + def start_from_latest(self): + """ + Specifies the consumer to start reading from the latest offset for all partitions. + This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + + :return: This object. + """ + self._j_kafka = self._j_kafka.startFromLatest() + return self + + def start_from_group_offsets(self): + """ + Specifies the consumer to start reading from any committed group offsets found + in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration + properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset" + set in the configuration properties will be used for the partition. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + + :return: This object. + """ + self._j_kafka = self._j_kafka.startFromGroupOffsets() + return self + + def start_from_specific_offsets(self, specific_offsets_dict): + """ + Specifies the consumer to start reading partitions from specific offsets, set independently + for each partition. The specified offset should be the offset of the next record that will + be read from partitions. This lets the consumer ignore any committed group offsets in + Zookeeper / Kafka brokers. + + If the provided map of offsets contains entries whose partition is not subscribed by the + consumer, the entry will be ignored. If the consumer subscribes to a partition that does + not exist in the provided map of offsets, the consumer will fallback to the default group + offset behaviour(see :func:`pyflink.table.table_descriptor.Kafka.start_from_group_offsets`) + for that particular partition. + + If the specified offset for a partition is invalid, or the behaviour for that partition is + defaulted to group offsets but still no group offset could be found for it, then the + "auto.offset.reset" behaviour set in the configuration properties will be used for the + partition. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + + :param specific_offsets_dict: Dict of specific_offsets that the key is int-type partition + id and value is int-type offset value. + :return: This object. + """ + for key in specific_offsets_dict: + self.start_from_specific_offset(key, specific_offsets_dict[key]) + return self + + def start_from_specific_offset(self, partition, specific_offset): + """ + Configures to start reading partitions from specific offsets and specifies the given offset + for the given partition. + + see :func:`pyflink.table.table_descriptor.Kafka.start_from_specific_offsets` + + :param partition: + :param specific_offset: + :return: This object. + """ + self._j_kafka = self._j_kafka.startFromSpecificOffset(int(partition), int(specific_offset)) + return self + + def sink_partitioner_fixed(self): + """ + Configures how to partition records from Flink's partitions into Kafka's partitions. + + This strategy ensures that each Flink partition ends up in one Kafka partition. + + ..note:: + One Kafka partition can contain multiple Flink partitions. Examples: + + More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain + the output of more than one flink partition: + + | Flink Sinks --------- Kafka Partitions + | 1 ----------------> 1 + | 2 --------------/ + | 3 -------------/ + | 4 ------------/ + + Fewer Flink partitions than Kafka partitions: + + | Flink Sinks --------- Kafka Partitions + | 1 ----------------> 1 + | 2 ----------------> 2 + | 3 + | 4 + | 5 + + :return: This object. + """ + self._j_kafka = self._j_kafka.sinkPartitionerFixed() + return self + + def sink_partitioner_round_robin(self): + """ + Configures how to partition records from Flink's partitions into Kafka's partitions. + + This strategy ensures that records will be distributed to Kafka partitions in a + round-robin fashion. + + ..note:: + This strategy is useful to avoid an unbalanced partitioning. However, it will cause a + lot of network connections between all the Flink instances and all the Kafka brokers. + + :return: This object. + """ + self._j_kafka = self._j_kafka.sinkPartitionerRoundRobin() + return self + + def sink_partitioner_custom(self, partitioner_class_name): + """ + Configures how to partition records from Flink's partitions into Kafka's partitions. + + This strategy allows for a custom partitioner by providing an implementation + of ``FlinkKafkaPartitioner``. + + :param partitioner_class_name: The java canonical class name of the FlinkKafkaPartitioner. + The FlinkKafkaPartitioner must have a public no-argument + constructor and can be founded by in current Java + classloader. + :return: This object. + """ + gateway = get_gateway() + self._j_kafka = self._j_kafka.sinkPartitionerCustom( + gateway.jvm.Thread.currentThread().getContextClassLoader() + .loadClass(partitioner_class_name)) + return self + + +class Elasticsearch(ConnectorDescriptor): + """ + Connector descriptor for the Elasticsearch search engine. + """ + + def __init__(self): + gateway = get_gateway() + self._j_elasticsearch = gateway.jvm.Elasticsearch() + super(Elasticsearch, self).__init__(self._j_elasticsearch) + + def version(self, version): + """ + Sets the Elasticsearch version to be used. Required. + + :param version: Elasticsearch version. E.g., "6". + :return: This object. + """ + if not isinstance(version, (str, unicode)): + version = str(version) + self._j_elasticsearch = self._j_elasticsearch.version(version) + return self + + def host(self, hostname, port, protocol): + """ + Adds an Elasticsearch host to connect to. Required. + + Multiple hosts can be declared by calling this method multiple times. + + :param hostname: Connection hostname. + :param port: Connection port. + :param protocol: Connection protocol; e.g. "http". + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.host(hostname, int(port), protocol) + return self + + def index(self, index): + """ + Declares the Elasticsearch index for every record. Required. + + :param index: Elasticsearch index. + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.index(index) + return self + + def document_type(self, document_type): + """ + Declares the Elasticsearch document type for every record. Required. + + :param document_type: Elasticsearch document type. + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.documentType(document_type) + return self + + def key_delimiter(self, key_delimiter): + """ + Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from + multiple fields. Optional. + + :param key_delimiter: Key delimiter; e.g., "$" would result in IDs "KEY1$KEY2$KEY3". + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.keyDelimiter(key_delimiter) + return self + + def key_null_literal(self, key_null_literal): + """ + Sets a custom representation for null fields in keys. Optional. + + :param key_null_literal: key null literal string; e.g. "N/A" would result in IDs + "KEY1_N/A_KEY3". + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.keyNullLiteral(key_null_literal) + return self + + def failure_handler_fail(self): + """ + Configures a failure handling strategy in case a request to Elasticsearch fails. + + This strategy throws an exception if a request fails and thus causes a job failure. + + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.failureHandlerFail() + return self + + def failure_handler_ignore(self): + """ + Configures a failure handling strategy in case a request to Elasticsearch fails. + + This strategy ignores failures and drops the request. + + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.failureHandlerIgnore() + return self + + def failure_handler_retry_rejected(self): + """ + Configures a failure handling strategy in case a request to Elasticsearch fails. + + This strategy re-adds requests that have failed due to queue capacity saturation. + + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.failureHandlerRetryRejected() + return self + + def failure_handler_custom(self, failure_handler_class_name): + """ + Configures a failure handling strategy in case a request to Elasticsearch fails. + + This strategy allows for custom failure handling using a ``ActionRequestFailureHandler``. + + :param failure_handler_class_name: + :return: This object. + """ + gateway = get_gateway() + self._j_elasticsearch = self._j_elasticsearch.failureHandlerCustom( + gateway.jvm.Thread.currentThread().getContextClassLoader() + .loadClass(failure_handler_class_name)) + return self + + def disable_flush_on_checkpoint(self): + """ + Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action + requests to be acknowledged by Elasticsearch on checkpoints. + + ..note:: + If flushing on checkpoint is disabled, a Elasticsearch sink does NOT + provide any strong guarantees for at-least-once delivery of action requests. + + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.disableFlushOnCheckpoint() + return self + + def bulk_flush_max_actions(self, max_actions_num): + """ + Configures how to buffer elements before sending them in bulk to the cluster for + efficiency. + + Sets the maximum number of actions to buffer for each bulk request. + + :param max_actions_num: the maximum number of actions to buffer per bulk request. + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxActions(int(max_actions_num)) + return self + + def bulk_flush_max_size(self, max_size): + """ + Configures how to buffer elements before sending them in bulk to the cluster for + efficiency. + + Sets the maximum size of buffered actions per bulk request (using the syntax of + MemorySize). + + :param max_size: The maximum size. E.g. "42 mb". only MB granularity is supported. + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.bulkFlushMaxSize(max_size) + return self + + def bulk_flush_interval(self, interval): + """ + Configures how to buffer elements before sending them in bulk to the cluster for + efficiency. + + Sets the bulk flush interval (in milliseconds). + + :param interval: Bulk flush interval (in milliseconds). + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.bulkFlushInterval(int(interval)) + return self + + def bulk_flush_backoff_constant(self): + """ + Configures how to buffer elements before sending them in bulk to the cluster for + efficiency. + + Sets a constant backoff type to use when flushing bulk requests. + + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffConstant() + return self + + def bulk_flush_backoff_exponential(self): + """ + Configures how to buffer elements before sending them in bulk to the cluster for + efficiency. + + Sets an exponential backoff type to use when flushing bulk requests. + + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffExponential() + return self + + def bulk_flush_backoff_max_retries(self, max_retries): + """ + Configures how to buffer elements before sending them in bulk to the cluster for + efficiency. + + Sets the maximum number of retries for a backoff attempt when flushing bulk requests. + + Make sure to enable backoff by selecting a strategy ( + :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_constant` or + :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_exponential`). + + :param max_retries: The maximum number of retries. + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffMaxRetries(int(max_retries)) + return self + + def bulk_flush_backoff_delay(self, delay): + """ + Configures how to buffer elements before sending them in bulk to the cluster for + efficiency. + + Sets the amount of delay between each backoff attempt when flushing bulk requests + (in milliseconds). + + Make sure to enable backoff by selecting a strategy ( + :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_constant` or + :func:`pyflink.table.table_descriptor.Elasticsearch.bulk_flush_backoff_exponential`). + + :param delay: Delay between each backoff attempt (in milliseconds). + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.bulkFlushBackoffDelay(int(delay)) + return self + + def connection_max_retry_timeout(self, max_retry_timeout): + """ + Sets connection properties to be used during REST communication to Elasticsearch. + + Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request. + + :param max_retry_timeout: Maximum timeout (in milliseconds). + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.connectionMaxRetryTimeout( + int(max_retry_timeout)) + return self + + def connection_path_prefix(self, path_prefix): + """ + Sets connection properties to be used during REST communication to Elasticsearch. + + Adds a path prefix to every REST communication. + + :param path_prefix: Prefix string to be added to every REST communication. + :return: This object. + """ + self._j_elasticsearch = self._j_elasticsearch.connectionPathPrefix(path_prefix) + return self + + class ConnectTableDescriptor(Descriptor): """ Common class for table's created with :class:`pyflink.table.TableEnvironment.connect`. diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py index 4fcc355..c9370fa 100644 --- a/flink-python/pyflink/table/tests/test_descriptor.py +++ b/flink-python/pyflink/table/tests/test_descriptor.py @@ -17,7 +17,8 @@ ################################################################################ import os -from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema) +from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime, Schema, Kafka, + Elasticsearch) from pyflink.table.table_sink import CsvTableSink from pyflink.table.types import DataTypes from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase, @@ -29,7 +30,7 @@ class FileSystemDescriptorTests(PyFlinkTestCase): def test_path(self): file_system = FileSystem() - file_system.path("/test.csv") + file_system = file_system.path("/test.csv") properties = file_system.to_properties() expected = {'connector.property-version': '1', @@ -38,12 +39,392 @@ class FileSystemDescriptorTests(PyFlinkTestCase): assert properties == expected +class KafkaDescriptorTests(PyFlinkTestCase): + + def test_version(self): + kafka = Kafka() + + kafka = kafka.version("0.11") + + properties = kafka.to_properties() + expected = {'connector.version': '0.11', + 'connector.type': 'kafka', + 'connector.property-version': '1'} + assert properties == expected + + def test_topic(self): + kafka = Kafka() + + kafka = kafka.topic("topic1") + + properties = kafka.to_properties() + expected = {'connector.type': 'kafka', + 'connector.topic': 'topic1', + 'connector.property-version': '1'} + assert properties == expected + + def test_properties(self): + kafka = Kafka() + + kafka = kafka.properties({"zookeeper.connect": "localhost:2181", + "bootstrap.servers": "localhost:9092"}) + + properties = kafka.to_properties() + expected = {'connector.type': 'kafka', + 'connector.properties.0.key': 'zookeeper.connect', + 'connector.properties.0.value': 'localhost:2181', + 'connector.properties.1.key': 'bootstrap.servers', + 'connector.properties.1.value': 'localhost:9092', + 'connector.property-version': '1'} + assert properties == expected + + def test_property(self): + kafka = Kafka() + + kafka = kafka.property("group.id", "testGroup") + + properties = kafka.to_properties() + expected = {'connector.type': 'kafka', + 'connector.properties.0.key': 'group.id', + 'connector.properties.0.value': 'testGroup', + 'connector.property-version': '1'} + assert properties == expected + + def test_start_from_earliest(self): + kafka = Kafka() + + kafka = kafka.start_from_earliest() + + properties = kafka.to_properties() + expected = {'connector.type': 'kafka', + 'connector.startup-mode': 'earliest-offset', + 'connector.property-version': '1'} + assert properties == expected + + def test_start_from_latest(self): + kafka = Kafka() + + kafka = kafka.start_from_latest() + + properties = kafka.to_properties() + expected = {'connector.type': 'kafka', + 'connector.startup-mode': 'latest-offset', + 'connector.property-version': '1'} + assert properties == expected + + def test_start_from_group_offsets(self): + kafka = Kafka() + + kafka = kafka.start_from_group_offsets() + + properties = kafka.to_properties() + expected = {'connector.type': 'kafka', + 'connector.startup-mode': 'group-offsets', + 'connector.property-version': '1'} + assert properties == expected + + def test_start_from_specific_offsets(self): + kafka = Kafka() + + kafka = kafka.start_from_specific_offsets({1: 220, 3: 400}) + + properties = kafka.to_properties() + expected = {'connector.startup-mode': 'specific-offsets', + 'connector.specific-offsets.0.partition': '1', + 'connector.specific-offsets.0.offset': '220', + 'connector.specific-offsets.1.partition': '3', + 'connector.specific-offsets.1.offset': '400', + 'connector.type': 'kafka', + 'connector.property-version': '1'} + assert properties == expected + + def test_start_from_specific_offset(self): + kafka = Kafka() + + kafka = kafka.start_from_specific_offset(3, 300) + + properties = kafka.to_properties() + expected = {'connector.startup-mode': 'specific-offsets', + 'connector.specific-offsets.0.partition': '3', + 'connector.specific-offsets.0.offset': '300', + 'connector.type': 'kafka', + 'connector.property-version': '1'} + assert properties == expected + + def test_sink_partitioner_fixed(self): + kafka = Kafka() + + kafka = kafka.sink_partitioner_fixed() + + properties = kafka.to_properties() + expected = {'connector.sink-partitioner': 'fixed', + 'connector.type': 'kafka', + 'connector.property-version': '1'} + assert properties == expected + + def test_sink_partitioner_custom(self): + kafka = Kafka() + + kafka = kafka.sink_partitioner_custom( + "org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner") + + properties = kafka.to_properties() + expected = {'connector.sink-partitioner': 'custom', + 'connector.sink-partitioner-class': + 'org.apache.flink.streaming.connectors.kafka.partitioner.' + 'FlinkFixedPartitioner', + 'connector.type': 'kafka', + 'connector.property-version': '1'} + assert properties == expected + + def test_sink_partitioner_round_robin(self): + kafka = Kafka() + + kafka = kafka.sink_partitioner_round_robin() + + properties = kafka.to_properties() + expected = {'connector.sink-partitioner': 'round-robin', + 'connector.type': 'kafka', + 'connector.property-version': '1'} + assert properties == expected + + +class ElasticsearchDescriptorTest(PyFlinkTestCase): + + def test_version(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.version("6") + + properties = elasticsearch.to_properties() + expected = {'connector.type': 'elasticsearch', + 'connector.version': '6', + 'connector.property-version': '1'} + assert properties == expected + + def test_host(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.host("localhost", 9200, "http") + + properties = elasticsearch.to_properties() + expected = {'connector.hosts.0.hostname': 'localhost', + 'connector.hosts.0.port': '9200', + 'connector.hosts.0.protocol': 'http', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_index(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.index("MyUsers") + + properties = elasticsearch.to_properties() + expected = {'connector.index': 'MyUsers', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_document_type(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.document_type("user") + + properties = elasticsearch.to_properties() + expected = {'connector.document-type': 'user', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_key_delimiter(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.key_delimiter("$") + + properties = elasticsearch.to_properties() + expected = {'connector.key-delimiter': '$', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_key_null_literal(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.key_null_literal("n/a") + + properties = elasticsearch.to_properties() + expected = {'connector.key-null-literal': 'n/a', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_failure_handler_fail(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.failure_handler_fail() + + properties = elasticsearch.to_properties() + expected = {'connector.failure-handler': 'fail', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_failure_handler_ignore(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.failure_handler_ignore() + + properties = elasticsearch.to_properties() + expected = {'connector.failure-handler': 'ignore', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_failure_handler_retry_rejected(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.failure_handler_retry_rejected() + + properties = elasticsearch.to_properties() + expected = {'connector.failure-handler': 'retry-rejected', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_failure_handler_custom(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.failure_handler_custom( + "org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler") + + properties = elasticsearch.to_properties() + expected = {'connector.failure-handler': 'custom', + 'connector.failure-handler-class': + 'org.apache.flink.streaming.connectors.elasticsearch.util.' + 'IgnoringFailureHandler', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_disable_flush_on_checkpoint(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.disable_flush_on_checkpoint() + + properties = elasticsearch.to_properties() + expected = {'connector.flush-on-checkpoint': 'false', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_bulk_flush_max_actions(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.bulk_flush_max_actions(42) + + properties = elasticsearch.to_properties() + expected = {'connector.bulk-flush.max-actions': '42', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_bulk_flush_max_size(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.bulk_flush_max_size("42 mb") + + properties = elasticsearch.to_properties() + expected = {'connector.bulk-flush.max-size': '44040192 bytes', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + + assert properties == expected + + def test_bulk_flush_interval(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.bulk_flush_interval(2000) + + properties = elasticsearch.to_properties() + expected = {'connector.bulk-flush.interval': '2000', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_bulk_flush_backoff_exponential(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.bulk_flush_backoff_exponential() + + properties = elasticsearch.to_properties() + expected = {'connector.bulk-flush.backoff.type': 'exponential', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_bulk_flush_backoff_constant(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.bulk_flush_backoff_constant() + + properties = elasticsearch.to_properties() + expected = {'connector.bulk-flush.backoff.type': 'constant', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_bulk_flush_backoff_max_retries(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.bulk_flush_backoff_max_retries(3) + + properties = elasticsearch.to_properties() + expected = {'connector.bulk-flush.backoff.max-retries': '3', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_bulk_flush_backoff_delay(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.bulk_flush_backoff_delay(30000) + + properties = elasticsearch.to_properties() + expected = {'connector.bulk-flush.backoff.delay': '30000', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_connection_max_retry_timeout(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.connection_max_retry_timeout(3000) + + properties = elasticsearch.to_properties() + expected = {'connector.connection-max-retry-timeout': '3000', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + def test_connection_path_prefix(self): + elasticsearch = Elasticsearch() + + elasticsearch = elasticsearch.connection_path_prefix("/v1") + + properties = elasticsearch.to_properties() + expected = {'connector.connection-path-prefix': '/v1', + 'connector.type': 'elasticsearch', + 'connector.property-version': '1'} + assert properties == expected + + class OldCsvDescriptorTests(PyFlinkTestCase): def test_field_delimiter(self): csv = OldCsv() - csv.field_delimiter("|") + csv = csv.field_delimiter("|") properties = csv.to_properties() expected = {'format.field-delimiter': '|', @@ -54,7 +435,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase): def test_line_delimiter(self): csv = OldCsv() - csv.line_delimiter(";") + csv = csv.line_delimiter(";") expected = {'format.type': 'csv', 'format.property-version': '1', @@ -66,7 +447,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase): def test_ignore_parse_errors(self): csv = OldCsv() - csv.ignore_parse_errors() + csv = csv.ignore_parse_errors() properties = csv.to_properties() expected = {'format.ignore-parse-errors': 'true', @@ -77,7 +458,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase): def test_quote_character(self): csv = OldCsv() - csv.quote_character("*") + csv = csv.quote_character("*") properties = csv.to_properties() expected = {'format.quote-character': '*', @@ -88,7 +469,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase): def test_comment_prefix(self): csv = OldCsv() - csv.comment_prefix("#") + csv = csv.comment_prefix("#") properties = csv.to_properties() expected = {'format.comment-prefix': '#', @@ -99,7 +480,7 @@ class OldCsvDescriptorTests(PyFlinkTestCase): def test_ignore_first_line(self): csv = OldCsv() - csv.ignore_first_line() + csv = csv.ignore_first_line() properties = csv.to_properties() expected = {'format.ignore-first-line': 'true', @@ -363,7 +744,7 @@ class AbstractTableDescriptorTests(object): def test_with_format(self): descriptor = self.t_env.connect(FileSystem()) - descriptor.with_format(OldCsv().field("a", "INT")) + descriptor = descriptor.with_format(OldCsv().field("a", "INT")) properties = descriptor.to_properties() @@ -378,7 +759,7 @@ class AbstractTableDescriptorTests(object): def test_with_schema(self): descriptor = self.t_env.connect(FileSystem()) - descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT")) + descriptor = descriptor.with_format(OldCsv()).with_schema(Schema().field("a", "INT")) properties = descriptor.to_properties() expected = {'schema.0.name': 'a', @@ -505,7 +886,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri def test_in_append_mode(self): descriptor = self.t_env.connect(FileSystem()) - descriptor\ + descriptor = descriptor\ .with_format(OldCsv())\ .in_append_mode() @@ -520,7 +901,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri def test_in_retract_mode(self): descriptor = self.t_env.connect(FileSystem()) - descriptor \ + descriptor = descriptor \ .with_format(OldCsv()) \ .in_retract_mode() @@ -535,7 +916,7 @@ class StreamTableDescriptorTests(PyFlinkStreamTableTestCase, AbstractTableDescri def test_in_upsert_mode(self): descriptor = self.t_env.connect(FileSystem()) - descriptor \ + descriptor = descriptor \ .with_format(OldCsv()) \ .in_upsert_mode() diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh index 6741d6a..f19fd8a 100755 --- a/tools/travis_controller.sh +++ b/tools/travis_controller.sh @@ -161,6 +161,8 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then find "$CACHE_FLINK_DIR" -maxdepth 8 -type f -name '*.jar' \ ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/lib/flink-dist*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-table*.jar" \ + ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-elasticsearch-base/target/flink-*.jar" \ + ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-kafka-base/target/flink-*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-table/flink-table-planner/target/flink-table-planner*tests.jar" | xargs rm -rf # .git directory