[GitHub] spark issue #19431: [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backp...
Github user akonopko commented on the issue: https://github.com/apache/spark/pull/19431 @koeninger done! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user akonopko commented on the issue: https://github.com/apache/spark/pull/19431 @koeninger resolved the conflict --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r173719300 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -456,6 +455,60 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { --- End diff -- Right, thank you. I will correct this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395493 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -22,6 +22,7 @@ import java.lang.{ Long => JLong } import java.util.{ Arrays, HashMap => JHashMap, Map => JMap } import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.UUID --- End diff -- Ah, they were disabled for test files. Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395482 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -539,6 +456,58 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { +backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150) + } + + private def backpressureTest(maxRatePerPartition: Int, + initialRate: Int, --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395492 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -21,6 +21,7 @@ import java.io.File import java.util.Arrays import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.UUID --- End diff -- Ah, they were disabled for test files. Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395487 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -687,6 +618,51 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { +backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150) + } + + private def backpressureTest(maxRatePerPartition: Int, + initialRate: Int, --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242320 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242353 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "1000") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242329 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( + new TopicAndPartition(topic, 0) -> 250)) // we run for half a second + +kafkaStream.stop() + } + + test("backpressure.initialRate should honor maxRatePerPartition") { +val topic = "backpressureInitialRate2" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "300") + .set("spark.streaming.backpressure.initialRate", "1000") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242186 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "1000") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( + new TopicPartition(topic, 0) -> 250)) // we run for half a second + +kafkaStream.stop() + } + + test("backpressure.initialRate should honor maxRatePerPartition") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "300") + .set("spark.streaming.backpressure.initialRate", "1000") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user akonopko commented on the issue: https://github.com/apache/spark/pull/19431 > Related the doc I thought it's kafka specific but it's not so fine like that Yes, it was implemented only in Kafka Streams but doc doesnt limit usage of this parameter to Kafka > good to merge the common functionalities Not sure I understood you correctly here. You mean in tests ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user akonopko commented on the issue: https://github.com/apache/spark/pull/19431 Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible. This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs: This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled. If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system. If somehow cluster was so heavily loaded with other processes that could process 0 events in Spark Streaming, this means that we might have huge backlog after that. Which mean without this fix system has big chance of overflowing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166606906 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- If somehow cluster was so heavily loaded with other processes that could process 0 events in Spark Streaming, this means that we might have huge backlog after that. Which mean without this fix system has big chance of overflowing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605640 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) + private val initialRate = context.sparkContext.getConf.getLong( +"spark.streaming.backpressure.initialRate", 0) + protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) + +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible. This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs: This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled. If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605671 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -108,7 +115,9 @@ class DirectKafkaInputDStream[ tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition } + case None => offsets.map { case (tp, offset) => tp -> { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605578 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) + private val initialRate = context.sparkContext.getConf.getLong( +"spark.streaming.backpressure.initialRate", 0) + protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) + +val estimatedRateLimit = rateController.map(x => { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605547 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible. This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs: `This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.` If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166603416 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user akonopko commented on the issue: https://github.com/apache/spark/pull/19431 @gaborgsomogyi `spark.streaming.backpressure.initialRate` is already documented in here: https://spark.apache.org/docs/latest/configuration.html But was mistakenly not included to to direct Kafka Streams --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19430: Spark 18580
Github user akonopko commented on the issue: https://github.com/apache/spark/pull/19430 @vanzin plz take a look at https://github.com/apache/spark/pull/19431 I am deleting this PR cause it was mistakenly created against branch-2.2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19430: Spark 18580
Github user akonopko closed the pull request at: https://github.com/apache/spark/pull/19430 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: Add spark.streaming.backpressure.initialRate to d...
GitHub user akonopko opened a pull request: https://github.com/apache/spark/pull/19431 Add spark.streaming.backpressure.initialRate to direct Kafka streams You can merge this pull request into a Git repository by running: $ git pull https://github.com/akonopko/spark SPARK-18580-initialrate Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19431.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19431 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19430: Spark 18580
GitHub user akonopko reopened a pull request: https://github.com/apache/spark/pull/19430 Spark 18580 ## What changes were proposed in this pull request? Add `spark.streaming.backpressure.initialRate` to direct Kafka Streams for Kafka 0.8 and 0.10 This is required in order to be able to use backpressure with huge lags, which cannot be processed at once. Without this parameter `DirectKafkaInputDStream` with backpressure enabled would try to get all the possible data from Kafka before adjusting consumption rate ## How was this patch tested? - Tests added to `org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala` and `org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala` - Manual tests on YARN cluster You can merge this pull request into a Git repository by running: $ git pull https://github.com/akonopko/spark SPARK-18580 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19430.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19430 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19430: Spark 18580
Github user akonopko closed the pull request at: https://github.com/apache/spark/pull/19430 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19430: Spark 18580
GitHub user akonopko opened a pull request: https://github.com/apache/spark/pull/19430 Spark 18580 ## What changes were proposed in this pull request? Add `spark.streaming.backpressure.initialRate` to direct Kafka Streams for Kafka 0.8 and 0.10 This is required in order to be able to use backpressure with huge lags, which cannot be processed at once. Without this parameter `DirectKafkaInputDStream` with backpressure enabled would try to get all the possible data from Kafka before adjusting consumption rate ## How was this patch tested? - Tests added to `org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala` and `org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala` - Manual tests on YARN cluster You can merge this pull request into a Git repository by running: $ git pull https://github.com/akonopko/spark SPARK-18580 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19430.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19430 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org