Zhiwen Sun created SPARK-18525: ---------------------------------- Summary: Kafka DirectInputStream cannot be aware of new partition Key: SPARK-18525 URL: https://issues.apache.org/jira/browse/SPARK-18525 Project: Spark Issue Type: Improvement Components: Input/Output Affects Versions: 2.0.2 Reporter: Zhiwen Sun
It seems that DirectKafkaInputStream does not support read new partition when spark streaming is running. Related spark code: https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101 How to produce it: {code:title=KafkaDirectTest.scala|borderStyle=solid} object KafkaDirectTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("kafka direct test 5") conf.setIfMissing("spark.master", "local[3]") conf.set("spark.streaming.kafka.maxRatePerPartition", "10") val ssc = new StreamingContext(conf, Seconds(1)) val zkQuorum = Config("common").getString("kafka.zkquorum") val topic = "test_use" val groupId = "stream-test-0809" val kafkaParams = Map( "metadata.broker.list" -> "dev-002:9092,dev-004:9092", "group.id" -> groupId ) val fromOffsets: Map[TopicAndPartition, Long] = Map( new TopicAndPartition(topic, 0) -> 0L, new TopicAndPartition(topic, 1) -> 0L, new TopicAndPartition(topic, 2) -> 0L, new TopicAndPartition(topic, 3) -> 0L ) val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) lines.foreachRDD { rdd => rdd.foreach { row => println(s"\n row: ${row} ") } val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetRanges.foreach { offset => println(s"\n----- offset: ${offset.topic} ${offset.partition} ${offset.fromOffset} ${offset.untilOffset}") } } ssc.start() ssc.awaitTermination() } } {code} 1. start the job 2. add new partition of test_use topic The job cannot read new partition data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org