[FLINK-6079] [kafka] Provide meaningful error message if TopicPartitions are 
null

This closes #3685.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8890a8db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8890a8db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8890a8db

Branch: refs/heads/table-retraction
Commit: 8890a8db41c45504aa658a1942f40bb9af7dcf30
Parents: a6355ed
Author: zentol <ches...@apache.org>
Authored: Thu Apr 6 11:55:29 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 114 +++++++++----------
 1 file changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8890a8db/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d409027..a35e710 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -348,71 +349,70 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
                // initialize subscribed partitions
                List<KafkaTopicPartition> kafkaTopicPartitions = 
getKafkaPartitions(topics);
+               Preconditions.checkNotNull(kafkaTopicPartitions, 
"TopicPartitions must not be null.");
 
                subscribedPartitionsToStartOffsets = new 
HashMap<>(kafkaTopicPartitions.size());
 
-               if (kafkaTopicPartitions != null) {
-                       if (restoredState != null) {
-                               for (KafkaTopicPartition kafkaTopicPartition : 
kafkaTopicPartitions) {
-                                       if 
(restoredState.containsKey(kafkaTopicPartition)) {
-                                               
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, 
restoredState.get(kafkaTopicPartition));
-                                       }
+               if (restoredState != null) {
+                       for (KafkaTopicPartition kafkaTopicPartition : 
kafkaTopicPartitions) {
+                               if 
(restoredState.containsKey(kafkaTopicPartition)) {
+                                       
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, 
restoredState.get(kafkaTopicPartition));
                                }
+                       }
 
-                               LOG.info("Consumer subtask {} will start 
reading {} partitions with offsets in restored state: {}",
-                                       
getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
-                       } else {
-                               initializeSubscribedPartitionsToStartOffsets(
-                                       subscribedPartitionsToStartOffsets,
-                                       kafkaTopicPartitions,
-                                       
getRuntimeContext().getIndexOfThisSubtask(),
-                                       
getRuntimeContext().getNumberOfParallelSubtasks(),
-                                       startupMode,
-                                       specificStartupOffsets);
-
-                               if (subscribedPartitionsToStartOffsets.size() 
!= 0) {
-                                       switch (startupMode) {
-                                               case EARLIEST:
-                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the earliest 
offsets: {}",
-                                                               
getRuntimeContext().getIndexOfThisSubtask(),
-                                                               
subscribedPartitionsToStartOffsets.size(),
-                                                               
subscribedPartitionsToStartOffsets.keySet());
-                                                       break;
-                                               case LATEST:
-                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the latest 
offsets: {}",
-                                                               
getRuntimeContext().getIndexOfThisSubtask(),
-                                                               
subscribedPartitionsToStartOffsets.size(),
-                                                               
subscribedPartitionsToStartOffsets.keySet());
-                                                       break;
-                                               case SPECIFIC_OFFSETS:
-                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the specified 
startup offsets {}: {}",
-                                                               
getRuntimeContext().getIndexOfThisSubtask(),
-                                                               
subscribedPartitionsToStartOffsets.size(),
-                                                               
specificStartupOffsets,
-                                                               
subscribedPartitionsToStartOffsets.keySet());
-
-                                                       
List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new 
ArrayList<>(subscribedPartitionsToStartOffsets.size());
-                                                       for 
(Map.Entry<KafkaTopicPartition, Long> subscribedPartition : 
subscribedPartitionsToStartOffsets.entrySet()) {
-                                                               if 
(subscribedPartition.getValue() == 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
-                                                                       
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
-                                                               }
+                       LOG.info("Consumer subtask {} will start reading {} 
partitions with offsets in restored state: {}",
+                               getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
+               } else {
+                       initializeSubscribedPartitionsToStartOffsets(
+                               subscribedPartitionsToStartOffsets,
+                               kafkaTopicPartitions,
+                               getRuntimeContext().getIndexOfThisSubtask(),
+                               
getRuntimeContext().getNumberOfParallelSubtasks(),
+                               startupMode,
+                               specificStartupOffsets);
+
+                       if (subscribedPartitionsToStartOffsets.size() != 0) {
+                               switch (startupMode) {
+                                       case EARLIEST:
+                                               LOG.info("Consumer subtask {} 
will start reading the following {} partitions from the earliest offsets: {}",
+                                                       
getRuntimeContext().getIndexOfThisSubtask(),
+                                                       
subscribedPartitionsToStartOffsets.size(),
+                                                       
subscribedPartitionsToStartOffsets.keySet());
+                                               break;
+                                       case LATEST:
+                                               LOG.info("Consumer subtask {} 
will start reading the following {} partitions from the latest offsets: {}",
+                                                       
getRuntimeContext().getIndexOfThisSubtask(),
+                                                       
subscribedPartitionsToStartOffsets.size(),
+                                                       
subscribedPartitionsToStartOffsets.keySet());
+                                               break;
+                                       case SPECIFIC_OFFSETS:
+                                               LOG.info("Consumer subtask {} 
will start reading the following {} partitions from the specified startup 
offsets {}: {}",
+                                                       
getRuntimeContext().getIndexOfThisSubtask(),
+                                                       
subscribedPartitionsToStartOffsets.size(),
+                                                       specificStartupOffsets,
+                                                       
subscribedPartitionsToStartOffsets.keySet());
+
+                                               List<KafkaTopicPartition> 
partitionsDefaultedToGroupOffsets = new 
ArrayList<>(subscribedPartitionsToStartOffsets.size());
+                                               for 
(Map.Entry<KafkaTopicPartition, Long> subscribedPartition : 
subscribedPartitionsToStartOffsets.entrySet()) {
+                                                       if 
(subscribedPartition.getValue() == 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+                                                               
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                                                        }
+                                               }
 
-                                                       if 
(partitionsDefaultedToGroupOffsets.size() > 0) {
-                                                               
LOG.warn("Consumer subtask {} cannot find offsets for the following {} 
partitions in the specified startup offsets: {}" +
-                                                                               
"; their startup offsets will be defaulted to their committed group offsets in 
Kafka.",
-                                                                       
getRuntimeContext().getIndexOfThisSubtask(),
-                                                                       
partitionsDefaultedToGroupOffsets.size(),
-                                                                       
partitionsDefaultedToGroupOffsets);
-                                                       }
-                                                       break;
-                                               default:
-                                               case GROUP_OFFSETS:
-                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the committed 
group offsets in Kafka: {}",
+                                               if 
(partitionsDefaultedToGroupOffsets.size() > 0) {
+                                                       LOG.warn("Consumer 
subtask {} cannot find offsets for the following {} partitions in the specified 
startup offsets: {}" +
+                                                                       "; 
their startup offsets will be defaulted to their committed group offsets in 
Kafka.",
                                                                
getRuntimeContext().getIndexOfThisSubtask(),
-                                                               
subscribedPartitionsToStartOffsets.size(),
-                                                               
subscribedPartitionsToStartOffsets.keySet());
-                                       }
+                                                               
partitionsDefaultedToGroupOffsets.size(),
+                                                               
partitionsDefaultedToGroupOffsets);
+                                               }
+                                               break;
+                                       default:
+                                       case GROUP_OFFSETS:
+                                               LOG.info("Consumer subtask {} 
will start reading the following {} partitions from the committed group offsets 
in Kafka: {}",
+                                                       
getRuntimeContext().getIndexOfThisSubtask(),
+                                                       
subscribedPartitionsToStartOffsets.size(),
+                                                       
subscribedPartitionsToStartOffsets.keySet());
                                }
                        }
                }

Reply via email to