[ https://issues.apache.org/jira/browse/SPARK-25239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-25239: ---------------------------------- Affects Version/s: (was: 2.4.0) (was: 2.2.0) (was: 2.1.0) 3.0.0 > Spark Streaming for Kafka should allow uniform batch size per partition for > streaming RDD > ----------------------------------------------------------------------------------------- > > Key: SPARK-25239 > URL: https://issues.apache.org/jira/browse/SPARK-25239 > Project: Spark > Issue Type: Improvement > Components: DStreams > Affects Versions: 3.0.0 > Reporter: Sidhavratha Kumar > Priority: Minor > > > > Current logic to determine maxMessagesPerPartition results in non-uniform > message size per partition based on lag of each partition. > > {code:java} > val backpressureRate = Math.round(lag / totalLag.toFloat * rate) > {code} > {code:java} > if (effectiveRateLimitPerPartition.values.sum > 0) { > val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / > 1000 > Some(effectiveRateLimitPerPartition.map { > case (tp, limit) => tp -> (secsPerBatch * limit).toLong > }) > } > {code} > > > > This will result in wastage of resource, since few cores which have less > messages to process will wait till other cores are done with their task. > Let us consider topic t have 2 partitions > > > ||Topic||Partition||Start Offset||End Offset||Current Offset|| > |t|0|0|10000|0| > |t|1|0|100|0| > and maxRatePerPartition = 1000 > and batch duration = 10 sec > As per calculation > maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99 > maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1 > If application is running on 2 cores, one core will wait after processing 1 > record of partition 1 till 99 records gets processed on other core for > partition 0, before picking up next RDD. > If we enforce uniformity in batch size across partitions in each rdd, it will > avoid wastage of resource. > In above case, we can put batch size for each partition = max(batch size of > all partitions) i.e. 99. > maxMessage for part-0 = 99 > maxMessage for part-1 = 99 > So, we can process 98 more records of partition 1 in same time without > wasting any resource. -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org