[ https://issues.apache.org/jira/browse/SPARK-18525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689015#comment-15689015 ]
Zhiwen Sun commented on SPARK-18525: ------------------------------------ Hi Cody: Thanks for your reply. We are still use kafka 0.8.2 , Is there a way to solve this problem? > Kafka DirectInputStream cannot be aware of new partition > -------------------------------------------------------- > > Key: SPARK-18525 > URL: https://issues.apache.org/jira/browse/SPARK-18525 > Project: Spark > Issue Type: Improvement > Components: Input/Output > Affects Versions: 2.0.2 > Reporter: Zhiwen Sun > > It seems that DirectKafkaInputStream does not support read new partition when > spark streaming is running. > Related spark code: > https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101 > How to produce it: > {code:title=KafkaDirectTest.scala|borderStyle=solid} > object KafkaDirectTest { > def main(args: Array[String]) { > val conf = new SparkConf().setAppName("kafka direct test 5") > conf.setIfMissing("spark.master", "local[3]") > conf.set("spark.streaming.kafka.maxRatePerPartition", "10") > val ssc = new StreamingContext(conf, Seconds(1)) > val zkQuorum = Config("common").getString("kafka.zkquorum") > val topic = "test_use" > val groupId = "stream-test-0809" > val kafkaParams = Map( > "metadata.broker.list" -> "dev-002:9092,dev-004:9092", > "group.id" -> groupId > ) > val fromOffsets: Map[TopicAndPartition, Long] = Map( > new TopicAndPartition(topic, 0) -> 0L, > new TopicAndPartition(topic, 1) -> 0L, > new TopicAndPartition(topic, 2) -> 0L, > new TopicAndPartition(topic, 3) -> 0L > ) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, Set(topic)) > lines.foreachRDD { rdd => > rdd.foreach { row => > println(s"\n row: ${row} ") > } > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > offsetRanges.foreach { offset => > println(s"\n----- offset: ${offset.topic} ${offset.partition} > ${offset.fromOffset} ${offset.untilOffset}") > } > } > ssc.start() > ssc.awaitTermination() > } > } > {code} > 1. start the job > 2. add new partition of test_use topic > The job cannot read new partition data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org