This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ad7e81a [FLINK-10843] [connectors] Change Kafka table factory version '2.0' to 'universal' ad7e81a is described below commit ad7e81ae3642f7d1f695a6f2474c59a81bb83949 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Nov 16 09:23:28 2018 +0100 [FLINK-10843] [connectors] Change Kafka table factory version '2.0' to 'universal' This closes #7087. --- docs/dev/table/connect.md | 27 +++++++++++++--------- .../flink/table/descriptors/KafkaValidator.java | 17 ++------------ .../kafka/KafkaTableSourceSinkFactory.java | 2 +- .../kafka/KafkaTableSourceSinkFactoryTest.java | 2 +- .../test-scripts/kafka_sql_common.sh | 3 ++- .../test-scripts/test_sql_client.sh | 8 ++++++- .../test-scripts/test_sql_client_kafka.sh | 2 +- .../test-scripts/test_sql_client_kafka010.sh | 2 +- .../test-scripts/test_sql_client_kafka_common.sh | 10 ++++++-- 9 files changed, 39 insertions(+), 34 deletions(-) diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 5f11127..effd913 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -40,14 +40,15 @@ The following table list all available connectors and formats. Their mutual comp ### Connectors -| Name | Version | Maven dependency | SQL Client JAR | -| :---------------- | :------------ | :--------------------------- | :----------------------| -| Filesystem | | Built-in | Built-in | -| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | -| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Name | Version | Maven dependency | SQL Client JAR | +| :---------------- | :------------------ | :--------------------------- | :----------------------| +| Filesystem | | Built-in | Built-in | +| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | +| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | ### Formats @@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t {% highlight java %} .connect( new Kafka() - .version("0.11") // required: valid connector versions are "0.8", "0.9", "0.10", and "0.11" + .version("0.11") // required: valid connector versions are + // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("...") // required: topic name from which the table is read // optional: connector specific properties @@ -549,7 +551,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t {% highlight yaml %} connector: type: kafka - version: "0.11" # required: valid connector versions are "0.8", "0.9", "0.10", and "0.11" + version: "0.11" # required: valid connector versions are + # "0.8", "0.9", "0.10", "0.11", and "universal" topic: ... # required: topic name from which the table is read properties: # optional: connector specific properties @@ -583,7 +586,9 @@ connector: **Consistency guarantees:** By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({{ site.baseurl }}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing). -**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. +**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. + +**Kafka 0.11+ Versioning:** Since Flink 1.7, the Kafka connector definition should be independent of a hard-coded Kafka version. Use the connector version `universal` as a wildcard for Flink's Kafka connector that is compatible with all Kafka versions starting from 0.11. Make sure to add the version-specific Kafka dependency. In addition, a corresponding format needs to be specified for reading and writing rows from and to Kafka. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java index 86d7ef6..0dbbbeb 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java @@ -21,9 +21,7 @@ package org.apache.flink.table.descriptors; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -40,7 +38,7 @@ public class KafkaValidator extends ConnectorDescriptorValidator { public static final String CONNECTOR_VERSION_VALUE_09 = "0.9"; public static final String CONNECTOR_VERSION_VALUE_010 = "0.10"; public static final String CONNECTOR_VERSION_VALUE_011 = "0.11"; - public static final String CONNECTOR_VERSION_VALUE_20 = "2.0"; + public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = "universal"; public static final String CONNECTOR_TOPIC = "connector.topic"; public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode"; public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; @@ -64,7 +62,7 @@ public class KafkaValidator extends ConnectorDescriptorValidator { super.validate(properties); properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA, false); - validateVersion(properties); + properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); validateStartupMode(properties); @@ -73,17 +71,6 @@ public class KafkaValidator extends ConnectorDescriptorValidator { validateSinkPartitioner(properties); } - private void validateVersion(DescriptorProperties properties) { - final List<String> versions = Arrays.asList( - CONNECTOR_VERSION_VALUE_08, - CONNECTOR_VERSION_VALUE_09, - CONNECTOR_VERSION_VALUE_010, - CONNECTOR_VERSION_VALUE_011, - CONNECTOR_VERSION_VALUE_20); - properties.validateEnumValues(CONNECTOR_VERSION, false, versions); - properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); - } - private void validateStartupMode(DescriptorProperties properties) { final Map<String, Consumer<String>> specificOffsetValidators = new HashMap<>(); specificOffsetValidators.put( diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java index b0dfc54..2b49867 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java @@ -39,7 +39,7 @@ public class KafkaTableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase @Override protected String kafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_20; + return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL; } @Override diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java index 5043880..4d843bc 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java @@ -40,7 +40,7 @@ public class KafkaTableSourceSinkFactoryTest extends KafkaTableSourceSinkFactory @Override protected String getKafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_20; + return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL; } @Override diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh index 3e95b40..468f058 100644 --- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh +++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh @@ -21,6 +21,7 @@ KAFKA_CONNECTOR_VERSION="$1" KAFKA_VERSION="$2" CONFLUENT_VERSION="$3" CONFLUENT_MAJOR_VERSION="$4" +KAFKA_SQL_VERSION="$5" source "$(dirname "$0")"/kafka-common.sh $2 $3 $4 @@ -64,7 +65,7 @@ function get_kafka_json_source_schema { type: ROW<type VARCHAR, message VARCHAR> connector: type: kafka - version: "$KAFKA_CONNECTOR_VERSION" + version: "$KAFKA_SQL_VERSION" topic: $topicName startup-mode: earliest-offset properties: diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 74192b4..5dd6883 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -23,9 +23,15 @@ KAFKA_CONNECTOR_VERSION="2.0" KAFKA_VERSION="2.0.0" CONFLUENT_VERSION="5.0.0" CONFLUENT_MAJOR_VERSION="5.0" +KAFKA_SQL_VERSION="universal" source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION +source "$(dirname "$0")"/kafka_sql_common.sh \ + $KAFKA_CONNECTOR_VERSION \ + $KAFKA_VERSION \ + $CONFLUENT_VERSION \ + $CONFLUENT_MAJOR_VERSION \ + $KAFKA_SQL_VERSION source "$(dirname "$0")"/elasticsearch-common.sh SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh index a05dc05..94e89a2 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh @@ -19,4 +19,4 @@ set -Eeuo pipefail -source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka" +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka" "universal" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh index c710abc..66bef66 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh @@ -19,4 +19,4 @@ set -Eeuo pipefail -source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10" +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10" "0.10" diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh index 149c86f..08ed59b 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh @@ -22,9 +22,15 @@ KAFKA_VERSION="$2" CONFLUENT_VERSION="$3" CONFLUENT_MAJOR_VERSION="$4" KAFKA_SQL_JAR="$5" +KAFKA_SQL_VERSION="$6" source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION +source "$(dirname "$0")"/kafka_sql_common.sh \ + $KAFKA_CONNECTOR_VERSION \ + $KAFKA_VERSION \ + $CONFLUENT_VERSION \ + $CONFLUENT_MAJOR_VERSION \ + $KAFKA_SQL_VERSION ################################################################################ # Prepare connectors @@ -98,7 +104,7 @@ cat >> $SQL_CONF << EOF type: BIGINT connector: type: kafka - version: "$KAFKA_CONNECTOR_VERSION" + version: "$KAFKA_SQL_VERSION" topic: test-avro startup-mode: earliest-offset properties: