[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570974#comment-14570974 ]
Platon Potapov commented on SPARK-7122: --------------------------------------- Nicolas, in your scenario, it appears that "createDirectStream" lags behind due to batches taking longer than 2 seconds to process from the get-go. Probably due to the same bug that this very ticket is about - trivial jobs take 3 seconds to process. However, batch processing duration did not grow over time in my case. But if you use the legacy "createStream", please keep an eye out for a much more devilish problem that I've also experienced: https://issues.apache.org/jira/browse/SPARK-7053 In short, with the legacy Kafka receiver, my output was also *eventually* lagging behind my input data, but only after the system has been subjected to a load over some time (many hours in my case - it's all in the ticket). > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > ------------------------------------------------------------------------------- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming > Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" > Reporter: Platon Potapov > Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org