Jack Hu created SPARK-7624:
------------------------------

             Summary: Task scheduler delay is increasing time over time in 
spark local mode
                 Key: SPARK-7624
                 URL: https://issues.apache.org/jira/browse/SPARK-7624
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.3.1
            Reporter: Jack Hu


I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 200000 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
    val tasks = scheduler.resourceOffers(offers).flatten
    for (task <- tasks) {
      freeCores -= scheduler.CPUS_PER_TASK
      executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
        task.name, task.serializedTask)
    }

    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
      // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
      context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
    }
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay is about 10 ms all the time. 
So there should have some conditions that the ReviveOffers will be duplicate 
scheduled? I am not sure why this happens, but i feel that this is the root 
cause of this issue. 

My spark settings:
#  Memor: 3G
# CPU: 8 cores 
# Streaming Batch interval: 5 seconds.  

Here are my streaming code:
{code}
val input = ssc.socketTextStream(
      hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
      /// parse the json to Order
      Order(_), preservePartitioning = true)
val mresult = input.map(
      v => (v.customer, UserSpending(v.customer, v.count * v.price, 
v.timestamp.toLong))).cache()
val tempr  = mresult.window(
            Seconds(firstStageWindowSize), 
            Seconds(firstStageWindowSize)
          ).transform(
            rdd => rdd.union(rdd).union(rdd).union(rdd)
          )
tempr.count.print
tempr.cache().foreachRDD((rdd, t) => {
            for (i <- 1 to 5) {
              val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count()
              println("""T: """ + t + """: """ + c)
            }
          })
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to