[ https://issues.apache.org/jira/browse/SPARK-4040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14184259#comment-14184259 ]
jay vyas commented on SPARK-4040: --------------------------------- strangely, from https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/Collect.scala , which certainly is a credible source on this subject, rdd.count() is called inside the input closure to foreachRDD.... So it seems to be a valid operation , at least in some cases? Unless the databricks reference app itself is mistaken ? {noformat} tweetStream.foreachRDD((rdd, time) => { val count = rdd.count() if (count > 0) { val outputRDD = rdd.repartition(partitionsEachInterval) outputRDD.saveAsTextFile(outputDirectory + "/tweets_" + time.milliseconds.toString) numTweetsCollected += count if (numTweetsCollected > numTweetsToCollect) { System.exit(0) } } }) {noformat} > calling count() on RDD's emitted from a DStream blocks forEachRDD progress. > --------------------------------------------------------------------------- > > Key: SPARK-4040 > URL: https://issues.apache.org/jira/browse/SPARK-4040 > Project: Spark > Issue Type: Bug > Components: Streaming > Reporter: jay vyas > > Please note that Im somewhat new to spark streaming's API, and am not a spark > expert - so I've done the best to write up and reproduce this "bug". If its > not a bug i hope an expert will help to explain why and promptly close it. > However, it appears it could be a bug after discussing with [~rnowling] who > is a spark contributor. > CC [~rnowling] [~willbenton] > > It appears that in a DStream context, a call to {{MappedRDD.count()}} > blocks progress and prevents emission of RDDs from a stream. > {noformat} > tweetStream.foreachRDD((rdd,lent)=> { > tweetStream.repartition(1) > //val count = rdd.count() DONT DO THIS ! > checks += 1; > if (checks > 20) { > ssc.stop() > } > } > {noformat} > The above code block should inevitably halt, after 20 intervals of RDDs... > However, if we *uncomment the call* to {{rdd.count()}}, it turns out that we > get an *infinite stream which emits no RDDs*, and thus our program *runs > forever* (ssc.stop is unreachable), because *forEach doesnt receive any more > entries*. > I suspect this is actually because the foreach block never completes, because > {{count()}} is winds up calling {{compute}}, which ultimately just reads from > the stream. > I havent put together a minimal reproducer or unit test yet, but I can work > on doing so if more info is needed. > I guess this could be seen as an application bug - but i think spark might be > made smarter to throw its hands up when people execute blocking code in a > stream processor. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org