Zhiwen Sun created SPARK-18525:
----------------------------------

             Summary: Kafka DirectInputStream cannot be aware of new partition
                 Key: SPARK-18525
                 URL: https://issues.apache.org/jira/browse/SPARK-18525
             Project: Spark
          Issue Type: Improvement
          Components: Input/Output
    Affects Versions: 2.0.2
            Reporter: Zhiwen Sun


It seems that DirectKafkaInputStream does not support read new partition when 
spark streaming is running.

Related spark code:

https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101


How to produce it:

{code:title=KafkaDirectTest.scala|borderStyle=solid}
object KafkaDirectTest {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("kafka direct test 5")
    conf.setIfMissing("spark.master", "local[3]")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
    val ssc = new StreamingContext(conf, Seconds(1))

    val zkQuorum = Config("common").getString("kafka.zkquorum")
    val topic = "test_use"
    val groupId = "stream-test-0809"

    val kafkaParams = Map(
      "metadata.broker.list" -> "dev-002:9092,dev-004:9092",
      "group.id" -> groupId
    )

    val fromOffsets: Map[TopicAndPartition, Long] = Map(
      new TopicAndPartition(topic, 0) -> 0L,
      new TopicAndPartition(topic, 1) -> 0L,
      new TopicAndPartition(topic, 2) -> 0L,
      new TopicAndPartition(topic, 3) -> 0L
    )

    val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, Set(topic))

    lines.foreachRDD { rdd =>
      rdd.foreach { row =>
        println(s"\n row: ${row} ")
      }

      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetRanges.foreach { offset =>
        println(s"\n----- offset: ${offset.topic} ${offset.partition} 
${offset.fromOffset} ${offset.untilOffset}")
      }
    }

    ssc.start()
    ssc.awaitTermination()

  }

}
{code}

1. start the job
2. add new partition of test_use topic

The job cannot read new partition data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to