[FLINK-9274] [kafka] Add thread name for partition discovery

This closes #5942


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/514cf81d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/514cf81d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/514cf81d

Branch: refs/heads/release-1.5
Commit: 514cf81d715a0789426c46e1cd05c4f0ebb762bb
Parents: a10f479
Author: Nico Kruber <[email protected]>
Authored: Mon Apr 30 14:22:40 2018 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Apr 30 23:25:30 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/514cf81d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 82ac2c3..cfb5b6d 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -715,7 +715,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                                }
                                        }
                                }
-                       });
+                       }, "Kafka Partition Discovery for " + 
getRuntimeContext().getTaskNameWithSubtasks());
 
                        discoveryLoopThread.start();
                        kafkaFetcher.runFetchLoop();

Reply via email to