This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f93346c  [FLINK-16125][connecotr/kafka] Remove Kafka connector 
property zookeeper.connect and clear documentation because Kafka 0.8 connector 
has been removed.
f93346c is described below

commit f93346c93c6a841cdce576d48a0b5ca8076cc195
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Mar 17 12:26:50 2020 +0800

    [FLINK-16125][connecotr/kafka] Remove Kafka connector property 
zookeeper.connect and clear documentation because Kafka 0.8 connector has been 
removed.
---
 docs/dev/table/connect.md                                 |  9 ---------
 docs/dev/table/connect.zh.md                              |  9 ---------
 docs/dev/table/hive/hive_catalog.md                       |  2 --
 docs/dev/table/hive/hive_catalog.zh.md                    |  2 --
 docs/dev/table/sqlClient.md                               |  2 --
 docs/dev/table/sqlClient.zh.md                            |  2 --
 .../connectors/kafka/KafkaTestEnvironmentImpl.java        |  1 -
 .../connectors/kafka/KafkaTestEnvironmentImpl.java        |  3 +--
 .../apache/flink/table/descriptors/KafkaValidator.java    |  8 ++------
 .../streaming/connectors/kafka/KafkaConsumerTestBase.java |  1 -
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java        | 15 ++++-----------
 .../streaming/connectors/kafka/KafkaTableTestBase.java    |  3 ---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java        |  1 -
 .../schema/registry/test/TestAvroConsumerConfluent.java   |  4 +---
 .../flink/tests/util/kafka/StreamingKafkaITCase.java      |  1 -
 .../src/test/resources/kafka_json_source_schema.yaml      |  3 ---
 .../flink/streaming/kafka/test/base/KafkaExampleUtil.java |  4 ++--
 .../apache/flink/streaming/kafka/test/KafkaExample.java   |  2 +-
 .../flink/streaming/kafka/test/Kafka010Example.java       |  2 +-
 .../flink/streaming/kafka/test/Kafka011Example.java       |  2 +-
 flink-end-to-end-tests/test-scripts/kafka_sql_common.sh   |  1 -
 .../test-scripts/test_confluent_schema_registry.sh        |  2 +-
 flink-python/pyflink/table/table_environment.py           |  1 -
 flink-python/pyflink/table/tests/test_descriptor.py       |  4 +---
 .../flink/table/api/java/StreamTableEnvironment.java      |  1 -
 .../java/org/apache/flink/table/api/TableEnvironment.java |  1 -
 .../flink/table/api/scala/StreamTableEnvironment.scala    |  1 -
 27 files changed, 15 insertions(+), 72 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 204c4cb..2bb8017 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -159,7 +159,6 @@ CREATE TABLE MyUserTable (
   'connector.version' = '0.10',
   'connector.topic' = 'topic_name',
   'connector.startup-mode' = 'earliest-offset',
-  'connector.properties.zookeeper.connect' = 'localhost:2181',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
 
   -- declare a format for this system
@@ -177,7 +176,6 @@ tableEnvironment
       .version("0.10")
       .topic("test-input")
       .startFromEarliest()
-      .property("zookeeper.connect", "localhost:2181")
       .property("bootstrap.servers", "localhost:9092")
   )
 
@@ -211,7 +209,6 @@ table_environment \
         .version("0.10")
         .topic("test-input")
         .start_from_earliest()
-        .property("zookeeper.connect", "localhost:2181")
         .property("bootstrap.servers", "localhost:9092")
     ) \
     .with_format(  # declare a format for this system
@@ -246,7 +243,6 @@ tables:
       topic: test-input
       startup-mode: earliest-offset
       properties:
-        zookeeper.connect: localhost:2181
         bootstrap.servers: localhost:9092
 
     # declare a format for this system
@@ -773,8 +769,6 @@ CREATE TABLE MyUserTable (
 
   'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
 
-  -- required: specify the ZooKeeper connection string
-  'connector.properties.zookeeper.connect' = 'localhost:2181',
   -- required: specify the Kafka server connection string
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   -- required for Kafka source, optional for Kafka sink, specify consumer group
@@ -814,7 +808,6 @@ CREATE TABLE MyUserTable (
     .topic("...")       // required: topic name from which the table is read
 
     // optional: connector specific properties
-    .property("zookeeper.connect", "localhost:2181")
     .property("bootstrap.servers", "localhost:9092")
     .property("group.id", "testGroup")
 
@@ -844,7 +837,6 @@ CREATE TABLE MyUserTable (
     .topic("...")     # required: topic name from which the table is read
     
     # optional: connector specific properties
-    .property("zookeeper.connect", "localhost:2181")
     .property("bootstrap.servers", "localhost:9092")
     .property("group.id", "testGroup")
 
@@ -874,7 +866,6 @@ connector:
   topic: ...          # required: topic name from which the table is read
 
   properties:
-    zookeeper.connect: localhost:2181  # required: specify the ZooKeeper 
connection string
     bootstrap.servers: localhost:9092  # required: specify the Kafka server 
connection string
     group.id: testGroup                # optional: required in Kafka consumer, 
specify consumer group
 
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 66b8d9a..720ab54 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -159,7 +159,6 @@ CREATE TABLE MyUserTable (
   'connector.version' = '0.10',
   'connector.topic' = 'topic_name',
   'connector.startup-mode' = 'earliest-offset',
-  'connector.properties.zookeeper.connect' = 'localhost:2181',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
 
   -- declare a format for this system
@@ -177,7 +176,6 @@ tableEnvironment
       .version("0.10")
       .topic("test-input")
       .startFromEarliest()
-      .property("zookeeper.connect", "localhost:2181")
       .property("bootstrap.servers", "localhost:9092")
   )
 
@@ -211,7 +209,6 @@ table_environment \
         .version("0.10")
         .topic("test-input")
         .start_from_earliest()
-        .property("zookeeper.connect", "localhost:2181")
         .property("bootstrap.servers", "localhost:9092")
     ) \
     .with_format(  # declare a format for this system
@@ -246,7 +243,6 @@ tables:
       topic: test-input
       startup-mode: earliest-offset
       properties:
-        zookeeper.connect: localhost:2181
         bootstrap.servers: localhost:9092
 
     # declare a format for this system
@@ -773,8 +769,6 @@ CREATE TABLE MyUserTable (
 
   'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
 
-  -- required: specify the ZooKeeper connection string
-  'connector.properties.zookeeper.connect' = 'localhost:2181',
   -- required: specify the Kafka server connection string
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   -- required for Kafka source, optional for Kafka sink, specify consumer group
@@ -814,7 +808,6 @@ CREATE TABLE MyUserTable (
     .topic("...")       // required: topic name from which the table is read
 
     // optional: connector specific properties
-    .property("zookeeper.connect", "localhost:2181")
     .property("bootstrap.servers", "localhost:9092")
     .property("group.id", "testGroup")
 
@@ -844,7 +837,6 @@ CREATE TABLE MyUserTable (
     .topic("...")     # required: topic name from which the table is read
 
     # optional: connector specific properties
-    .property("zookeeper.connect", "localhost:2181")
     .property("bootstrap.servers", "localhost:9092")
     .property("group.id", "testGroup")
 
@@ -874,7 +866,6 @@ connector:
   topic: ...          # required: topic name from which the table is read
 
   properties:
-    zookeeper.connect: localhost:2181  # required: specify the ZooKeeper 
connection string
     bootstrap.servers: localhost:9092  # required: specify the Kafka server 
connection string
     group.id: testGroup                # optional: required in Kafka consumer, 
specify consumer group
 
diff --git a/docs/dev/table/hive/hive_catalog.md 
b/docs/dev/table/hive/hive_catalog.md
index d2e7d5a..d907703 100644
--- a/docs/dev/table/hive/hive_catalog.md
+++ b/docs/dev/table/hive/hive_catalog.md
@@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH 
(
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
-   'connector.properties.zookeeper.connect' = 'localhost:2181',
    'connector.properties.bootstrap.servers' = 'localhost:9092',
    'format.type' = 'csv',
    'update-mode' = 'append'
@@ -227,7 +226,6 @@ Location:                   ......
 Table Type:            MANAGED_TABLE
 Table Parameters:
        flink.connector.properties.bootstrap.servers    localhost:9092
-       flink.connector.properties.zookeeper.connect    localhost:2181
        flink.connector.topic   test
        flink.connector.type    kafka
        flink.connector.version universal
diff --git a/docs/dev/table/hive/hive_catalog.zh.md 
b/docs/dev/table/hive/hive_catalog.zh.md
index d2e7d5a..d907703 100644
--- a/docs/dev/table/hive/hive_catalog.zh.md
+++ b/docs/dev/table/hive/hive_catalog.zh.md
@@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH 
(
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
-   'connector.properties.zookeeper.connect' = 'localhost:2181',
    'connector.properties.bootstrap.servers' = 'localhost:9092',
    'format.type' = 'csv',
    'update-mode' = 'append'
@@ -227,7 +226,6 @@ Location:                   ......
 Table Type:            MANAGED_TABLE
 Table Parameters:
        flink.connector.properties.bootstrap.servers    localhost:9092
-       flink.connector.properties.zookeeper.connect    localhost:2181
        flink.connector.topic   test
        flink.connector.type    kafka
        flink.connector.version universal
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index ed4ff80..9ea1bb8 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -317,7 +317,6 @@ tables:
       topic: TaxiRides
       startup-mode: earliest-offset
       properties:
-        zookeeper.connect: localhost:2181
         bootstrap.servers: localhost:9092
         group.id: testGroup
     format:
@@ -483,7 +482,6 @@ tables:
       version: "0.11"
       topic: OutputTopic
       properties:
-        zookeeper.connect: localhost:2181
         bootstrap.servers: localhost:9092
         group.id: testGroup
     format:
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index 93d506b..642f448 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -317,7 +317,6 @@ tables:
       topic: TaxiRides
       startup-mode: earliest-offset
       properties:
-        zookeeper.connect: localhost:2181
         bootstrap.servers: localhost:9092
         group.id: testGroup
     format:
@@ -483,7 +482,6 @@ tables:
       version: "0.11"
       topic: OutputTopic
       properties:
-        zookeeper.connect: localhost:2181
         bootstrap.servers: localhost:9092
         group.id: testGroup
     format:
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 64649ee..322c3aa 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -253,7 +253,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                LOG.info("ZK and KafkaServer started.");
 
                standardProps = new Properties();
-               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
                standardProps.setProperty("enable.auto.commit", "false");
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index a3982ba..478ce38 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -134,7 +134,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                LOG.info("ZK and KafkaServer started.");
 
                standardProps = new Properties();
-               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
                standardProps.setProperty("enable.auto.commit", "false");
@@ -393,8 +392,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                kafkaProperties.put("advertised.host.name", KAFKA_HOST);
                kafkaProperties.put("broker.id", Integer.toString(brokerId));
                kafkaProperties.put("log.dir", tmpFolder.toString());
-               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
                kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
+               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
                kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
                kafkaProperties.put("transaction.max.timeout.ms", 
Integer.toString(1000 * 60 * 60 * 2)); // 2hours
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index f55bc1e..158417a 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -54,7 +54,6 @@ public class KafkaValidator extends 
ConnectorDescriptorValidator {
        public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
        public static final String CONNECTOR_STARTUP_TIMESTAMP_MILLIS = 
"connector.startup-timestamp-millis";
        public static final String CONNECTOR_PROPERTIES = 
"connector.properties";
-       public static final String CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT = 
"connector.properties.zookeeper.connect";
        public static final String CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER = 
"connector.properties.bootstrap.servers";
        public static final String CONNECTOR_PROPERTIES_GROUP_ID = 
"connector.properties.group.id";
        public static final String CONNECTOR_PROPERTIES_KEY = "key";
@@ -136,11 +135,9 @@ public class KafkaValidator extends 
ConnectorDescriptorValidator {
        }
 
        private void validateKafkaProperties(DescriptorProperties properties) {
-               if 
(properties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT)
-                       || 
properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER)
+               if 
(properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER)
                        || 
properties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID)) {
 
-                       
properties.validateString(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT, false);
                        
properties.validateString(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER, false);
                        
properties.validateString(CONNECTOR_PROPERTIES_GROUP_ID, true);
 
@@ -235,8 +232,7 @@ public class KafkaValidator extends 
ConnectorDescriptorValidator {
        }
 
        public static boolean hasConciseKafkaProperties(DescriptorProperties 
descriptorProperties) {
-               return 
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT) ||
-                       
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) ||
+               return 
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) ||
                        
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID);
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2cf999b..b788fb8 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -169,7 +169,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
 
                        // use wrong ports for the consumers
                        properties.setProperty("bootstrap.servers", 
"localhost:80");
-                       properties.setProperty("zookeeper.connect", 
"localhost:80");
                        properties.setProperty("group.id", "test");
                        properties.setProperty("request.timeout.ms", "3000"); 
// let the test fail fast
                        properties.setProperty("socket.timeout.ms", "3000");
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 0875c28..d8eb011 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -93,7 +93,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
 
        private static final Properties KAFKA_PROPERTIES = new Properties();
        static {
-               KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
                KAFKA_PROPERTIES.setProperty("group.id", "dummy");
                KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
        }
@@ -224,7 +223,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
 
                // use legacy properties
                legacyPropertiesMap.remove("connector.specific-offsets");
-               
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
                
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
                legacyPropertiesMap.remove("connector.properties.group.id");
 
@@ -236,12 +234,10 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                legacyPropertiesMap.put("connector.specific-offsets.0.offset", 
"100");
                
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
                legacyPropertiesMap.put("connector.specific-offsets.1.offset", 
"123");
-               legacyPropertiesMap.put("connector.properties.0.key", 
"zookeeper.connect");
+               legacyPropertiesMap.put("connector.properties.0.key", 
"bootstrap.servers");
                legacyPropertiesMap.put("connector.properties.0.value", 
"dummy");
-               legacyPropertiesMap.put("connector.properties.1.key", 
"bootstrap.servers");
+               legacyPropertiesMap.put("connector.properties.1.key", 
"group.id");
                legacyPropertiesMap.put("connector.properties.1.value", 
"dummy");
-               legacyPropertiesMap.put("connector.properties.2.key", 
"group.id");
-               legacyPropertiesMap.put("connector.properties.2.value", 
"dummy");
 
                final TableSource<?> actualSource = 
TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
                        .createStreamTableSource(legacyPropertiesMap);
@@ -330,7 +326,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
 
                // use legacy properties
                legacyPropertiesMap.remove("connector.specific-offsets");
-               
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
                
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
                legacyPropertiesMap.remove("connector.properties.group.id");
 
@@ -342,12 +337,10 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                legacyPropertiesMap.put("connector.specific-offsets.0.offset", 
"100");
                
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
                legacyPropertiesMap.put("connector.specific-offsets.1.offset", 
"123");
-               legacyPropertiesMap.put("connector.properties.0.key", 
"zookeeper.connect");
+               legacyPropertiesMap.put("connector.properties.0.key", 
"bootstrap.servers");
                legacyPropertiesMap.put("connector.properties.0.value", 
"dummy");
-               legacyPropertiesMap.put("connector.properties.1.key", 
"bootstrap.servers");
+               legacyPropertiesMap.put("connector.properties.1.key", 
"group.id");
                legacyPropertiesMap.put("connector.properties.1.value", 
"dummy");
-               legacyPropertiesMap.put("connector.properties.2.key", 
"group.id");
-               legacyPropertiesMap.put("connector.properties.2.value", 
"dummy");
 
                final TableSink<?> actualSink = 
TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
                        .createStreamTableSink(legacyPropertiesMap);
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
index bd526e9..a3d0acd 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
@@ -66,7 +66,6 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBase {
 
                // ---------- Produce an event time stream into Kafka 
-------------------
                String groupId = standardProps.getProperty("group.id");
-               String zk = standardProps.getProperty("zookeeper.connect");
                String bootstraps = 
standardProps.getProperty("bootstrap.servers");
 
                // TODO: use DDL to register Kafka once FLINK-15282 is fixed.
@@ -83,7 +82,6 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBase {
                properties.put("connector.type", "kafka");
                properties.put("connector.topic", topic);
                properties.put("connector.version", kafkaVersion());
-               properties.put("connector.properties.zookeeper.connect", zk);
                properties.put("connector.properties.bootstrap.servers", 
bootstraps);
                properties.put("connector.properties.group.id", groupId);
                properties.put("connector.startup-mode", "earliest-offset");
@@ -112,7 +110,6 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBase {
 //                     "  'connector.type' = 'kafka',\n" +
 //                     "  'connector.topic' = '" + topic + "',\n" +
 //                     "  'connector.version' = 'universal',\n" +
-//                     "  'connector.properties.zookeeper.connect' = '" + zk + 
"',\n" +
 //                     "  'connector.properties.bootstrap.servers' = '" + 
bootstraps + "',\n" +
 //                     "  'connector.properties.group.id' = '" + groupId + "', 
\n" +
 //                     "  'connector.startup-mode' = 'earliest-offset',  \n" +
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 15b1594..16cb724 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -137,7 +137,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                LOG.info("ZK and KafkaServer started.");
 
                standardProps = new Properties();
-               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
                standardProps.setProperty("enable.auto.commit", "false");
diff --git 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
index 55549de..dda4617 100644
--- 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
+++ 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -37,7 +37,7 @@ import java.util.Properties;
  * A simple example that shows how to read from and write to Kafka with 
Confluent Schema Registry.
  * This will read AVRO messages from the input topic, parse them into a POJO 
type via checking the Schema by calling Schema registry.
  * Then this example publish the POJO type to kafka by converting the POJO to 
AVRO and verifying the schema.
- * --input-topic test-input --output-string-topic test-output 
--output-avro-topic test-avro-output --output-subject --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url 
http://localhost:8081 --group.id myconsumer
+ * --input-topic test-input --output-string-topic test-output 
--output-avro-topic test-avro-output --output-subject --bootstrap.servers 
localhost:9092 --schema-registry-url http://localhost:8081 --group.id myconsumer
  */
 public class TestAvroConsumerConfluent {
 
@@ -49,14 +49,12 @@ public class TestAvroConsumerConfluent {
                        System.out.println("Missing parameters!\n" +
                                "Usage: Kafka --input-topic <topic> 
--output-string-topic <topic> --output-avro-topic <topic> " +
                                "--bootstrap.servers <kafka brokers> " +
-                               "--zookeeper.connect <zk quorum> " +
                                "--schema-registry-url <confluent schema 
registry> --group.id <some id>");
                        return;
                }
                Properties config = new Properties();
                config.setProperty("bootstrap.servers", 
parameterTool.getRequired("bootstrap.servers"));
                config.setProperty("group.id", 
parameterTool.getRequired("group.id"));
-               config.setProperty("zookeeper.connect", 
parameterTool.getRequired("zookeeper.connect"));
                String schemaRegistryUrl = 
parameterTool.getRequired("schema-registry-url");
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index a7ffb14..22d8506 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -101,7 +101,6 @@ public class StreamingKafkaITCase extends TestLogger {
                                .addArgument("--output-topic", outputTopic)
                                .addArgument("--prefix", "PREFIX")
                                .addArgument("--bootstrap.servers", 
kafka.getBootstrapServerAddresses().stream().map(address -> 
address.getHostString() + ':' + 
address.getPort()).collect(Collectors.joining(",")))
-                               .addArgument("--zookeeper.connect ", 
kafka.getZookeeperAddress().getHostString() + ':' + 
kafka.getZookeeperAddress().getPort())
                                .addArgument("--group.id", "myconsumer")
                                .addArgument("--auto.offset.reset", "earliest")
                                .addArgument("--transaction.timeout.ms", 
"900000")
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
index 6de0c71..600e3f1 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml
@@ -40,7 +40,6 @@ tables:
       topic: $TOPIC_NAME
       startup-mode: earliest-offset
       properties:
-        zookeeper.connect: $KAFKA_ZOOKEEPER_ADDRESS
         bootstrap.servers: $KAFKA_BOOTSTRAP_SERVERS
     format:
       type: json
@@ -86,8 +85,6 @@ tables:
       topic: test-avro
       startup-mode: earliest-offset
       properties:
-        - key: zookeeper.connect
-          value: $KAFKA_ZOOKEEPER_ADDRESS
         - key: bootstrap.servers
           value: $KAFKA_BOOTSTRAP_SERVERS
     format:
diff --git 
a/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 
b/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
index 0e3c4ea..5505a7a 100644
--- 
a/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
+++ 
b/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
@@ -34,11 +34,11 @@ public class KafkaExampleUtil {
                        System.out.println("Missing parameters!\n" +
                                "Usage: Kafka --input-topic <topic> 
--output-topic <topic> " +
                                "--bootstrap.servers <kafka brokers> " +
-                               "--zookeeper.connect <zk quorum> --group.id 
<some id>");
+                               "--group.id <some id>");
                        throw new Exception("Missing parameters!\n" +
                                "Usage: Kafka --input-topic <topic> 
--output-topic <topic> " +
                                "--bootstrap.servers <kafka brokers> " +
-                               "--zookeeper.connect <zk quorum> --group.id 
<some id>");
+                               "--group.id <some id>");
                }
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
 
b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
index 3a3be93..f3c844c 100644
--- 
a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
+++ 
b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
@@ -40,7 +40,7 @@ import 
org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
  *
  * <p>Example usage:
  *     --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092
- *     --zookeeper.connect localhost:2181 --group.id myconsumer
+ *     --group.id myconsumer
  */
 public class KafkaExample extends KafkaExampleUtil {
 
diff --git 
a/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
 
b/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
index 0b97179..14c9493 100644
--- 
a/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
+++ 
b/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
  * the String messages are of formatted as a (word,frequency,timestamp) tuple.
  *
  * <p>Example usage:
- *     --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ *     --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 localhost:2181 --group.id myconsumer
  */
 public class Kafka010Example {
 
diff --git 
a/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
 
b/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
index 1f877c5..fafd307 100644
--- 
a/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
+++ 
b/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
  * the String messages are of formatted as a (word,frequency,timestamp) tuple.
  *
  * <p>Example usage:
- *     --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ *     --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --group.id myconsumer
  */
 public class Kafka011Example {
 
diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh 
b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
index 560e43e..c7ed12a 100644
--- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
@@ -68,7 +68,6 @@ function get_kafka_json_source_schema {
       topic: $topicName
       startup-mode: earliest-offset
       properties:
-        zookeeper.connect: localhost:2181
         bootstrap.servers: localhost:9092
     format:
       type: json
diff --git 
a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh 
b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
index 5d3e9e4..a023f39 100755
--- a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
+++ b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
@@ -78,7 +78,7 @@ create_kafka_topic 1 1 test-avro-out
 # Read Avro message from [test-avro-input], check the schema and send message 
to [test-string-ou]
 $FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
   --input-topic test-avro-input --output-string-topic test-string-out 
--output-avro-topic test-avro-out --output-subject test-output-subject \
-  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id myconsumer --auto.offset.reset earliest \
+  --bootstrap.servers localhost:9092 --group.id myconsumer --auto.offset.reset 
earliest \
   --schema-registry-url ${SCHEMA_REGISTRY_URL}
 
 #echo "Reading messages from Kafka topic [test-string-ou] ..."
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 46aa70f..7acdaa0 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -522,7 +522,6 @@ class TableEnvironment(object):
             ...     'connector.type' = 'kafka',
             ...     'update-mode' = 'append',
             ...     'connector.topic' = 'xxx',
-            ...     'connector.properties.zookeeper.connect' = 
'localhost:2181',
             ...     'connector.properties.bootstrap.servers' = 'localhost:9092'
             ... )
             ... '''
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py 
b/flink-python/pyflink/table/tests/test_descriptor.py
index 002f499..6894e10 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -61,12 +61,10 @@ class KafkaDescriptorTests(PyFlinkTestCase):
         self.assertEqual(expected, properties)
 
     def test_properties(self):
-        kafka = Kafka().properties({"zookeeper.connect": "localhost:2181",
-                                    "bootstrap.servers": "localhost:9092"})
+        kafka = Kafka().properties({"bootstrap.servers": "localhost:9092"})
 
         properties = kafka.to_properties()
         expected = {'connector.type': 'kafka',
-                    'connector.properties.zookeeper.connect': 'localhost:2181',
                     'connector.properties.bootstrap.servers': 'localhost:9092',
                     'connector.property-version': '1'}
         self.assertEqual(expected, properties)
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index 364e619..74f0b65 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -701,7 +701,6 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
         *     new Kafka()
         *       .version("0.11")
         *       .topic("clicks")
-        *       .property("zookeeper.connect", "localhost")
         *       .property("group.id", "click-group")
         *       .startFromEarliest())
         *   .withFormat(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index f086dd8..c89d76a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -689,7 +689,6 @@ public interface TableEnvironment {
         *                        'connector.type' = 'kafka',
         *                        'update-mode' = 'append',
         *                        'connector.topic' = 'xxx',
-        *                        'connector.properties.zookeeper.connect' = 
'localhost:2181',
         *                        'connector.properties.bootstrap.servers' = 
'localhost:9092',
         *                        ...
         *                      )";
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 54d0c97..d569c83 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -458,7 +458,6 @@ trait StreamTableEnvironment extends TableEnvironment {
     *     new Kafka()
     *       .version("0.11")
     *       .topic("clicks")
-    *       .property("zookeeper.connect", "localhost")
     *       .property("group.id", "click-group")
     *       .startFromEarliest())
     *   .withFormat(

Reply via email to