Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5929#discussion_r185759080 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java --- @@ -74,7 +74,12 @@ protected void initializeConnections() { try { for (String topic : topics) { - for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) { + List<PartitionInfo> topicPartitions = kafkaConsumer.partitionsFor(topic); + if (topicPartitions == null) { + throw new IllegalStateException("The topic " + topic + " does not exist"); --- End diff -- I think the `RuntimeException` in `AbstractPartitionDiscoverer#discoverPartitions` maybe needs to be revisited, also. As far as I understand it, we should only fail the job if for the first discovery (for seed initial partitions that the connector consumes) is empty across all partitions. Otherwise, it should be ok that while the job runs, the partitions discover fails fetch partition meta info for some discovery attempt.
---