This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 020f329  [FLINK-8290] Allow setting clientId in 
flink-connector-kafka-0.8
020f329 is described below

commit 020f3299ebccc36297a0d3db425cc5f44c70a8b7
Author: maqingxiang-it <maqingxiang...@dev05v.sys.corp.qihoo.net>
AuthorDate: Tue Jan 16 21:42:45 2018 +0800

    [FLINK-8290] Allow setting clientId in flink-connector-kafka-0.8
---
 .../streaming/connectors/kafka/internals/SimpleConsumerThread.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 4c704c3..1fdff9d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -94,6 +94,7 @@ class SimpleConsumerThread<T> extends Thread {
        private final int fetchSize;
        private final int bufferSize;
        private final int reconnectLimit;
+       private final String clientId;
 
        // exceptions are thrown locally
        public SimpleConsumerThread(
@@ -123,6 +124,8 @@ class SimpleConsumerThread<T> extends Thread {
                this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
                this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
                this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+               String groupId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
+               this.clientId = config.getProperty("client.id", groupId);
        }
 
        public 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
getNewPartitionsQueue() {
@@ -138,8 +141,6 @@ class SimpleConsumerThread<T> extends Thread {
                LOG.info("Starting to fetch from {}", this.partitions);
 
                // set up the config values
-               final String clientId = "flink-kafka-consumer-legacy-" + 
broker.id();
-
                try {
                        // create the Kafka consumer that we actually use for 
fetching
                        consumer = new SimpleConsumer(broker.host(), 
broker.port(), soTimeout, bufferSize, clientId);

Reply via email to