Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5929#discussion_r185732772
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
    @@ -74,7 +74,12 @@ protected void initializeConnections() {
     
                try {
                        for (String topic : topics) {
    -                           for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
    +                           List<PartitionInfo> topicPartitions = 
kafkaConsumer.partitionsFor(topic);
    +                           if (topicPartitions == null) {
    +                                   throw new IllegalStateException("The 
topic " + topic + " does not exist");
    --- End diff --
    
    I fear that this might be too aggressive.
    IMO, it is fine that the user has, say 3 topics, but only one of them 
actually doesn't exist.
    
    What we should handle is the case where there is completely no partitions 
at all across all provided topics.
    Perhaps for this, we only write a log that some topic has no partitions?


---

Reply via email to