Good catch. Could you create a ticket? You can also submit a PR to fix it if you have time :)
On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris <chris.bow...@hpe.com> wrote: > Potential bug when using startingOffsets = SpecificOffsets with Kafka > topics containing uppercase characters? > > KafkaSourceProvider#L80/86: > > val startingOffsets = > > caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) > match { > case Some("latest") => LatestOffsets > case Some("earliest") => EarliestOffsets > case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) > case None => LatestOffsets > } > > Topics in JSON get lowered so underlying assignments in the consumer are > incorrect, and the assertion in KafkaSource#L326 triggers: > > private def fetchSpecificStartingOffsets( > partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = > { > val result = withRetriesWithoutInterrupt { > // Poll to get the latest assigned partitions > consumer.poll(0) > val partitions = consumer.assignment() > consumer.pause(partitions) > assert(partitions.asScala == partitionOffsets.keySet, > "If startingOffsets contains specific offsets, you must specify all > TopicPartitions.\n" + > "Use -1 for latest, -2 for earliest, if you don't care.\n" + > s"Specified: ${partitionOffsets.keySet} Assigned: > ${partitions.asScala}") > >