[ https://issues.apache.org/jira/browse/SPARK-17853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565400#comment-15565400 ]
Cody Koeninger commented on SPARK-17853: ---------------------------------------- Use a different group id. Let me know if that addresses the issue. > Kafka OffsetOutOfRangeException on DStreams union from separate Kafka > clusters with identical topic names. > ---------------------------------------------------------------------------------------------------------- > > Key: SPARK-17853 > URL: https://issues.apache.org/jira/browse/SPARK-17853 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 2.0.0 > Reporter: Marcin Kuthan > > During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException > reported by Kafka client. In our scenario we create single DStream as a union > of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). > Both Kafka clusters have the same topics and number of partitions. > After quick investigation, I found that class DirectKafkaInputDStream keeps > offset state for topic and partitions, but it is not aware of different Kafka > clusters. > For every topic, single DStream is created as a union from all configured > Kafka clusters. > {code} > class KafkaDStreamSource(configs: Iterable[Map[String, String]]) { > def createSource(ssc: StreamingContext, topic: String): DStream[(String, > Array[Byte])] = { > val streams = configs.map { config => > val kafkaParams = config > val kafkaTopics = Set(topic) > KafkaUtils. > createDirectStream[String, Array[Byte]]( > ssc, > LocationStrategies.PreferConsistent, > ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, > kafkaParams) > ).map { record => > (record.key, record.value) > } > } > ssc.union(streams.toSeq) > } > } > {code} > At the end, offsets from one Kafka cluster overwrite offsets from second one. > Fortunately OffsetOutOfRangeException was thrown because offsets in both > Kafka clusters are significantly different. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org