Potential bug when using startingOffsets = SpecificOffsets with Kafka topics containing uppercase characters?
KafkaSourceProvider#L80/86: val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { case Some("latest") => LatestOffsets case Some("earliest") => EarliestOffsets case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) case None => LatestOffsets } Topics in JSON get lowered so underlying assignments in the consumer are incorrect, and the assertion in KafkaSource#L326 triggers: private def fetchSpecificStartingOffsets( partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { val result = withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) assert(partitions.asScala == partitionOffsets.keySet, "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + "Use -1 for latest, -2 for earliest, if you don't care.\n" + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")