This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 020f329 [FLINK-8290] Allow setting clientId in flink-connector-kafka-0.8 020f329 is described below commit 020f3299ebccc36297a0d3db425cc5f44c70a8b7 Author: maqingxiang-it <maqingxiang...@dev05v.sys.corp.qihoo.net> AuthorDate: Tue Jan 16 21:42:45 2018 +0800 [FLINK-8290] Allow setting clientId in flink-connector-kafka-0.8 --- .../streaming/connectors/kafka/internals/SimpleConsumerThread.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java index 4c704c3..1fdff9d 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -94,6 +94,7 @@ class SimpleConsumerThread<T> extends Thread { private final int fetchSize; private final int bufferSize; private final int reconnectLimit; + private final String clientId; // exceptions are thrown locally public SimpleConsumerThread( @@ -123,6 +124,8 @@ class SimpleConsumerThread<T> extends Thread { this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); + this.clientId = config.getProperty("client.id", groupId); } public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() { @@ -138,8 +141,6 @@ class SimpleConsumerThread<T> extends Thread { LOG.info("Starting to fetch from {}", this.partitions); // set up the config values - final String clientId = "flink-kafka-consumer-legacy-" + broker.id(); - try { // create the Kafka consumer that we actually use for fetching consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);