[ 
https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-7624:
-----------------------------------

    Assignee: Apache Spark  (was: Davies Liu)

> Task scheduler delay is increasing time over time in spark local mode
> ---------------------------------------------------------------------
>
>                 Key: SPARK-7624
>                 URL: https://issues.apache.org/jira/browse/SPARK-7624
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1
>            Reporter: Jack Hu
>            Assignee: Apache Spark
>              Labels: delay, schedule
>
> I am running a simple spark streaming program with spark 1.3.1 in local mode, 
> it receives json string from a socket with rate 50 events per second, it can 
> run well in first 6 hours (although the minor gc count per minute is 
> increasing all the time), after that, i can see that the scheduler delay in 
> every task is significant increased from 10 ms to 100 ms, after 10 hours 
> running, the task delay is about 800 ms and cpu is also increased from 2% to 
> 30%. This causes the steaming job can not finish in one batch interval (5 
> seconds). I dumped the java memory after 16 hours and can see there are about 
> 200000 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
> {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the 
> code and see only one place may put the {{ReviveOffers}} to akka 
> {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
> {code}
>  def reviveOffers() {
>     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
> freeCores))
>     val tasks = scheduler.resourceOffers(offers).flatten
>     for (task <- tasks) {
>       freeCores -= scheduler.CPUS_PER_TASK
>       executor.launchTask(executorBackend, taskId = task.taskId, 
> attemptNumber = task.attemptNumber,
>         task.name, task.serializedTask)
>     }
>     if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
>       // Try to reviveOffer after 1 second, because scheduler may wait for 
> locality timeout
>       context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
>     }
> }
> {code}
> I removed the last three lines in this method (the whole {{if}} block, which 
> is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it 
> worked smooth after 20 hours running, the scheduler delay is about 10 ms all 
> the time. So there should have some conditions that the ReviveOffers will be 
> duplicate scheduled? I am not sure why this happens, but i feel that this is 
> the root cause of this issue. 
> My spark settings:
> #  Memor: 3G
> # CPU: 8 cores 
> # Streaming Batch interval: 5 seconds.  
> Here are my streaming code:
> {code}
> val input = ssc.socketTextStream(
>       hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
>       /// parse the json to Order
>       Order(_), preservePartitioning = true)
> val mresult = input.map(
>       v => (v.customer, UserSpending(v.customer, v.count * v.price, 
> v.timestamp.toLong))).cache()
> val tempr  = mresult.window(
>             Seconds(firstStageWindowSize), 
>             Seconds(firstStageWindowSize)
>           ).transform(
>             rdd => rdd.union(rdd).union(rdd).union(rdd)
>           )
> tempr.count.print
> tempr.cache().foreachRDD((rdd, t) => {
>             for (i <- 1 to 5) {
>               val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count()
>               println("""T: """ + t + """: """ + c)
>             }
>           })
> {code}
> ========================================================
> Updated at 2015-05-15
> I did print some detail schedule times of the suspect lines in 
> {{LocalActor::reviveOffers}}: {color:red}*1685343501*{color} times after 18 
> hours running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to