[ 
https://issues.apache.org/jira/browse/SPARK-25239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-25239:
----------------------------------
    Affects Version/s:     (was: 2.4.0)
                           (was: 2.2.0)
                           (was: 2.1.0)
                       3.0.0

> Spark Streaming for Kafka should allow uniform batch size per partition for 
> streaming RDD
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-25239
>                 URL: https://issues.apache.org/jira/browse/SPARK-25239
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 3.0.0
>            Reporter: Sidhavratha Kumar
>            Priority: Minor
>
>  
>  
> Current logic to determine maxMessagesPerPartition results in non-uniform 
> message size per partition based on lag of each partition.
>  
> {code:java}
> val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
> {code}
> {code:java}
>  if (effectiveRateLimitPerPartition.values.sum > 0) { 
>     val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
> 1000 
>     Some(effectiveRateLimitPerPartition.map { 
>         case (tp, limit) => tp -> (secsPerBatch * limit).toLong 
>     }) 
> }
> {code}
>  
>  
>  
> This will result in wastage of resource, since few cores which have less 
> messages to process will wait till other cores are done with their task.
> Let us consider topic t have 2 partitions
>  
>  
> ||Topic||Partition||Start Offset||End Offset||Current Offset||
> |t|0|0|10000|0|
> |t|1|0|100|0|
> and maxRatePerPartition = 1000
> and batch duration = 10 sec
> As per calculation
> maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99
>  maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1
> If application is running on 2 cores, one core will wait after processing 1 
> record of partition 1 till 99 records gets processed on other core for 
> partition 0, before picking up next RDD.
> If we enforce uniformity in batch size across partitions in each rdd, it will 
> avoid wastage of resource.
>  In above case, we can put batch size for each partition = max(batch size of 
> all partitions) i.e. 99.
> maxMessage for part-0 = 99
>  maxMessage for part-1 = 99
> So, we can process 98 more records of partition 1 in same time without 
> wasting any resource.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to