[ 
https://issues.apache.org/jira/browse/SPARK-4040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14184259#comment-14184259
 ] 

jay vyas commented on SPARK-4040:
---------------------------------

strangely,  from 
https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/Collect.scala
  , which certainly is a credible source on this subject, rdd.count() is called 
inside the input  closure to foreachRDD....  So it seems to be a valid 
operation , at least in some cases?  Unless the databricks reference app itself 
is mistaken ?

{noformat}
tweetStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.repartition(partitionsEachInterval)
outputRDD.saveAsTextFile(outputDirectory + "/tweets_" + 
time.milliseconds.toString)
numTweetsCollected += count
if (numTweetsCollected > numTweetsToCollect) {
System.exit(0)
}
}
})
{noformat}

> calling count() on RDD's emitted from a DStream blocks forEachRDD progress.
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-4040
>                 URL: https://issues.apache.org/jira/browse/SPARK-4040
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: jay vyas
>
> Please note that Im somewhat new to spark streaming's API, and am not a spark 
> expert - so I've done the best to write up and reproduce this "bug".  If its 
> not a bug i hope an expert will help to explain why and promptly close it.  
> However, it appears it could be a bug after discussing with [~rnowling] who 
> is a spark contributor.
> CC [~rnowling] [~willbenton] 
>  
> It appears that in a DStream context, a call to   {{MappedRDD.count()}} 
> blocks progress and prevents emission of RDDs from a stream.
> {noformat}
>     tweetStream.foreachRDD((rdd,lent)=> {
>       tweetStream.repartition(1)
>       //val count = rdd.count()  DONT DO THIS !
>       checks += 1;
>       if (checks > 20) {
>         ssc.stop()
>       }
>    }
> {noformat} 
> The above code block should inevitably halt, after 20 intervals of RDDs... 
> However, if we *uncomment the call* to {{rdd.count()}}, it turns out that we 
> get an *infinite stream which emits no RDDs*, and thus our program *runs 
> forever* (ssc.stop is unreachable), because *forEach doesnt receive any more 
> entries*.  
> I suspect this is actually because the foreach block never completes, because 
> {{count()}} is winds up calling {{compute}}, which ultimately just reads from 
> the stream.
> I havent put together a minimal reproducer or unit test yet, but I can work 
> on doing so if more info is needed.
> I guess this could be seen as an application bug - but i think spark might be 
> made smarter to throw its hands up when people execute blocking code in a 
> stream processor. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to