Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167242320
  
    --- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
    @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
           Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) 
-> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val topicPartitions = Set(TopicAndPartition(topic, 0))
    +    kafkaTestUtils.createTopic(topic, 1)
    +    val kafkaParams = Map(
    +      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      val kc = new KafkaCluster(kafkaParams)
    +      val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
    +      val m = kc.getEarliestLeaderOffsets(topicPartitions)
    +        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
    +
    +      new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
    +        ssc, kafkaParams, m, messageHandler)
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
    +
    +    
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to