Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167242353
  
    --- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
    @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
           Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) 
-> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "1000")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      new DirectKafkaInputDStream[String, String](
    +        ssc,
    +        preferredHosts,
    +        ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
    +        new DefaultPerPartitionConfig(sparkConf)
    +      )
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicPartition(topic, 0) -> 1000L)
    +
    +    
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to