This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ad7e81a  [FLINK-10843] [connectors] Change Kafka table factory version 
'2.0' to 'universal'
ad7e81a is described below

commit ad7e81ae3642f7d1f695a6f2474c59a81bb83949
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Nov 16 09:23:28 2018 +0100

    [FLINK-10843] [connectors] Change Kafka table factory version '2.0' to 
'universal'
    
    This closes #7087.
---
 docs/dev/table/connect.md                          | 27 +++++++++++++---------
 .../flink/table/descriptors/KafkaValidator.java    | 17 ++------------
 .../kafka/KafkaTableSourceSinkFactory.java         |  2 +-
 .../kafka/KafkaTableSourceSinkFactoryTest.java     |  2 +-
 .../test-scripts/kafka_sql_common.sh               |  3 ++-
 .../test-scripts/test_sql_client.sh                |  8 ++++++-
 .../test-scripts/test_sql_client_kafka.sh          |  2 +-
 .../test-scripts/test_sql_client_kafka010.sh       |  2 +-
 .../test-scripts/test_sql_client_kafka_common.sh   | 10 ++++++--
 9 files changed, 39 insertions(+), 34 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 5f11127..effd913 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -40,14 +40,15 @@ The following table list all available connectors and 
formats. Their mutual comp
 
 ### Connectors
 
-| Name              | Version       | Maven dependency             | SQL 
Client JAR         |
-| :---------------- | :------------ | :--------------------------- | 
:----------------------|
-| Filesystem        |               | Built-in                     | Built-in  
             |
-| Elasticsearch     | 6             | `flink-connector-elasticsearch6` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
-| Apache Kafka      | 0.8           | `flink-connector-kafka-0.8`  | Not 
available          |
-| Apache Kafka      | 0.9           | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
-| Apache Kafka      | 0.10          | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
-| Apache Kafka      | 0.11          | `flink-connector-kafka-0.11` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Name              | Version             | Maven dependency             | SQL 
Client JAR         |
+| :---------------- | :------------------ | :--------------------------- | 
:----------------------|
+| Filesystem        |                     | Built-in                     | 
Built-in               |
+| Elasticsearch     | 6                   | `flink-connector-elasticsearch6` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.8                 | `flink-connector-kafka-0.8`  | Not 
available          |
+| Apache Kafka      | 0.9                 | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.10                | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.11                | `flink-connector-kafka-0.11` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka      | 0.11+ (`universal`) | `flink-connector-kafka`      | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
 
 ### Formats
 
@@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and 
to an Apache Kafka t
 {% highlight java %}
 .connect(
   new Kafka()
-    .version("0.11")    // required: valid connector versions are "0.8", 
"0.9", "0.10", and "0.11"
+    .version("0.11")    // required: valid connector versions are
+                        //   "0.8", "0.9", "0.10", "0.11", and "universal"
     .topic("...")       // required: topic name from which the table is read
 
     // optional: connector specific properties
@@ -549,7 +551,8 @@ The Kafka connector allows for reading and writing from and 
to an Apache Kafka t
 {% highlight yaml %}
 connector:
   type: kafka
-  version: "0.11"     # required: valid connector versions are "0.8", "0.9", 
"0.10", and "0.11"
+  version: "0.11"     # required: valid connector versions are
+                      #   "0.8", "0.9", "0.10", "0.11", and "universal"
   topic: ...          # required: topic name from which the table is read
 
   properties:         # optional: connector specific properties
@@ -583,7 +586,9 @@ connector:
 
 **Consistency guarantees:** By default, a Kafka sink ingests data with 
at-least-once guarantees into a Kafka topic if the query is executed with 
[checkpointing enabled]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing).
 
-**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp 
as metadata that specifies when the record was written into the Kafka topic. 
These timestamps can be used for a [rowtime 
attribute](connect.html#defining-the-schema) by selecting `timestamps: 
from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. 
+**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp 
as metadata that specifies when the record was written into the Kafka topic. 
These timestamps can be used for a [rowtime 
attribute](connect.html#defining-the-schema) by selecting `timestamps: 
from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively.
+
+**Kafka 0.11+ Versioning:** Since Flink 1.7, the Kafka connector definition 
should be independent of a hard-coded Kafka version. Use the connector version 
`universal` as a wildcard for Flink's Kafka connector that is compatible with 
all Kafka versions starting from 0.11.
 
 Make sure to add the version-specific Kafka dependency. In addition, a 
corresponding format needs to be specified for reading and writing rows from 
and to Kafka.
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index 86d7ef6..0dbbbeb 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -21,9 +21,7 @@ package org.apache.flink.table.descriptors;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -40,7 +38,7 @@ public class KafkaValidator extends 
ConnectorDescriptorValidator {
        public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
        public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
        public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
-       public static final String CONNECTOR_VERSION_VALUE_20 = "2.0";
+       public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = 
"universal";
        public static final String CONNECTOR_TOPIC = "connector.topic";
        public static final String CONNECTOR_STARTUP_MODE = 
"connector.startup-mode";
        public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
@@ -64,7 +62,7 @@ public class KafkaValidator extends 
ConnectorDescriptorValidator {
                super.validate(properties);
                properties.validateValue(CONNECTOR_TYPE, 
CONNECTOR_TYPE_VALUE_KAFKA, false);
 
-               validateVersion(properties);
+               properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
 
                validateStartupMode(properties);
 
@@ -73,17 +71,6 @@ public class KafkaValidator extends 
ConnectorDescriptorValidator {
                validateSinkPartitioner(properties);
        }
 
-       private void validateVersion(DescriptorProperties properties) {
-               final List<String> versions = Arrays.asList(
-                       CONNECTOR_VERSION_VALUE_08,
-                       CONNECTOR_VERSION_VALUE_09,
-                       CONNECTOR_VERSION_VALUE_010,
-                       CONNECTOR_VERSION_VALUE_011,
-                       CONNECTOR_VERSION_VALUE_20);
-               properties.validateEnumValues(CONNECTOR_VERSION, false, 
versions);
-               properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
-       }
-
        private void validateStartupMode(DescriptorProperties properties) {
                final Map<String, Consumer<String>> specificOffsetValidators = 
new HashMap<>();
                specificOffsetValidators.put(
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
index b0dfc54..2b49867 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactory.java
@@ -39,7 +39,7 @@ public class KafkaTableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBase
 
        @Override
        protected String kafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_20;
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
index 5043880..4d843bc 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTest.java
@@ -40,7 +40,7 @@ public class KafkaTableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFactory
 
        @Override
        protected String getKafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_20;
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
        }
 
        @Override
diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh 
b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
index 3e95b40..468f058 100644
--- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
@@ -21,6 +21,7 @@ KAFKA_CONNECTOR_VERSION="$1"
 KAFKA_VERSION="$2"
 CONFLUENT_VERSION="$3"
 CONFLUENT_MAJOR_VERSION="$4"
+KAFKA_SQL_VERSION="$5"
 
 source "$(dirname "$0")"/kafka-common.sh $2 $3 $4
 
@@ -64,7 +65,7 @@ function get_kafka_json_source_schema {
         type: ROW<type VARCHAR, message VARCHAR>
     connector:
       type: kafka
-      version: "$KAFKA_CONNECTOR_VERSION"
+      version: "$KAFKA_SQL_VERSION"
       topic: $topicName
       startup-mode: earliest-offset
       properties:
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 74192b4..5dd6883 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -23,9 +23,15 @@ KAFKA_CONNECTOR_VERSION="2.0"
 KAFKA_VERSION="2.0.0"
 CONFLUENT_VERSION="5.0.0"
 CONFLUENT_MAJOR_VERSION="5.0"
+KAFKA_SQL_VERSION="universal"
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION 
$KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
+source "$(dirname "$0")"/kafka_sql_common.sh \
+  $KAFKA_CONNECTOR_VERSION \
+  $KAFKA_VERSION \
+  $CONFLUENT_VERSION \
+  $CONFLUENT_MAJOR_VERSION \
+  $KAFKA_SQL_VERSION
 source "$(dirname "$0")"/elasticsearch-common.sh
 
 SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
index a05dc05..94e89a2 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
@@ -19,4 +19,4 @@
 
 set -Eeuo pipefail
 
-source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 
"kafka"
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 
"kafka" "universal"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
index c710abc..66bef66 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
@@ -19,4 +19,4 @@
 
 set -Eeuo pipefail
 
-source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 
3.2 "kafka-0.10"
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 
3.2 "kafka-0.10" "0.10"
diff --git 
a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
index 149c86f..08ed59b 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
@@ -22,9 +22,15 @@ KAFKA_VERSION="$2"
 CONFLUENT_VERSION="$3"
 CONFLUENT_MAJOR_VERSION="$4"
 KAFKA_SQL_JAR="$5"
+KAFKA_SQL_VERSION="$6"
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION 
$KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
+source "$(dirname "$0")"/kafka_sql_common.sh \
+  $KAFKA_CONNECTOR_VERSION \
+  $KAFKA_VERSION \
+  $CONFLUENT_VERSION \
+  $CONFLUENT_MAJOR_VERSION \
+  $KAFKA_SQL_VERSION
 
 
################################################################################
 # Prepare connectors
@@ -98,7 +104,7 @@ cat >> $SQL_CONF << EOF
         type: BIGINT
     connector:
       type: kafka
-      version: "$KAFKA_CONNECTOR_VERSION"
+      version: "$KAFKA_SQL_VERSION"
       topic: test-avro
       startup-mode: earliest-offset
       properties:

Reply via email to