Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5929#discussion_r185759080
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
    @@ -74,7 +74,12 @@ protected void initializeConnections() {
     
                try {
                        for (String topic : topics) {
    -                           for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
    +                           List<PartitionInfo> topicPartitions = 
kafkaConsumer.partitionsFor(topic);
    +                           if (topicPartitions == null) {
    +                                   throw new IllegalStateException("The 
topic " + topic + " does not exist");
    --- End diff --
    
    I think the `RuntimeException` in 
`AbstractPartitionDiscoverer#discoverPartitions` maybe needs to be revisited, 
also.
    As far as I understand it, we should only fail the job if for the first 
discovery (for seed initial partitions that the connector consumes) is empty 
across all partitions. Otherwise, it should be ok that while the job runs, the 
partitions discover fails fetch partition meta info for some discovery attempt.


---

Reply via email to