[ 
https://issues.apache.org/jira/browse/SPARK-13707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15183283#comment-15183283
 ] 

Jatin Kumar commented on SPARK-13707:
-------------------------------------

* On the top this looks like a problem with UI only though I am not sure if the 
source of this problem affects other non-ui components as well. Ideally the 
events count reported on UI should be same as "Total Impressions" printed by my 
spark application code but here we see that Total Impressions > Events Count. 
Also since the receiver is unaware of the later on window operations being 
problem, it behaves same as it would have for a non window application which is 
a continous range. So I strongly feel that it is actually fetching/processing 
kafka range in a continuous manner.

* Yes this is evedent for both. I would think that this is true for any source 
and the pipeline having a window operation.

> Streaming UI tab misleading for window operations
> -------------------------------------------------
>
>                 Key: SPARK-13707
>                 URL: https://issues.apache.org/jira/browse/SPARK-13707
>             Project: Spark
>          Issue Type: Bug
>          Components: Web UI
>    Affects Versions: 1.6.0
>            Reporter: Jatin Kumar
>         Attachments: Screen Shot 2016-03-06 at 11.09.55 pm.png
>
>
> 'Streaming' tab on spark UI is misleading when the job has a window operation 
> which changes the batch duration from original streaming context batch 
> duration.
> For instance consider:
> {code:java}
> val streamingContext = new StreamingContext(sparkConfig, Seconds(2))
> val totalVideoImps = streamingContext.sparkContext.accumulator(0, 
> "TotalVideoImpressions")
> val totalImps = streamingContext.sparkContext.accumulator(0, 
> "TotalImpressions")
> val stream = KafkaReader.KafkaDirectStream(streamingContext)
> stream.map(KafkaAdLogParser.parseAdLogRecord)
>   .filter(record => {
>     totalImps += 1
>     KafkaAdLogParser.isVideoRecord(record)
>   })
>   .map(record => {
>     totalVideoImps += 1
>     record.url
>   })
>   .window(Seconds(120))
>   .count().foreachRDD((rdd, time) => {
>   println("Timestamp: " + ImpressionAggregator.millsToDate(time.milliseconds))
>   println("Count: " + rdd.collect()(0))
>   println("Total Impressions: " + totalImps.value)
>   totalImps.setValue(0)
>   println("Total Video Impressions: " + totalVideoImps.value)
>   totalVideoImps.setValue(0)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> Batch Size before window operation is 2 sec and then after window batches are 
> of 120 seconds each.
> --
> Above code printed following for my application whereas the UI showed 
> different numbers.
> {noformat}
> Timestamp: 2016-03-06 12:02:56,000
> Count: 362195
> Total Impressions: 16882431
> Total Video Impressions: 362195
> Timestamp: 2016-03-06 12:04:56,000
> Count: 367168
> Total Impressions: 19480293
> Total Video Impressions: 367168
> Timestamp: 2016-03-06 12:06:56,000
> Count: 177711
> Total Impressions: 10196677
> Total Video Impressions: 177711
> {noformat}
> whereas the spark UI shows different numbers as attached in the image. Also 
> if we check the start and end index of kafka partition offsets reported by 
> subsequent batch entries on UI, they do not result in all overall continuous 
> range. All numbers are fine if we remove the window operation though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to