Hi TD,
Just let you know the job group and cancelation worked after I switched to
spark 1.3.1. I set a group id for rdd.countApprox() and cancel it, then set
another group id for the remaining job of the foreachRDD but let it complete.
As a by-product, I use group id to indicate what the job does. :-)
Thanks,Du
On Wednesday, May 13, 2015 4:23 PM, Tathagata Das <[email protected]>
wrote:
You might get stage information through SparkListener. But I am not sure
whether you can use that information to easily kill stages. Though i highly
recommend using Spark 1.3.1 (or even Spark master). Things move really fast
between releases. 1.1.1 feels really old to me :P
TD
On Wed, May 13, 2015 at 1:25 PM, Du Li <[email protected]> wrote:
I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside
dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context
seems no longer valid, which crashes subsequent jobs.
My spark version is 1.1.1. I will do more investigation into this issue,
perhaps after upgrading to 1.3.1, and then file a JIRA if it persists.
Is there a way to get stage or task id of a particular transformation or action
on RDD and then selectively kill the stage or tasks? It would be necessary and
useful in situations similar to countApprox.
Thanks,Du
On Wednesday, May 13, 2015 1:12 PM, Tathagata Das <[email protected]>
wrote:
That is not supposed to happen :/ That is probably a bug.If you have the log4j
logs, would be good to file a JIRA. This may be worth debugging.
On Wed, May 13, 2015 at 12:13 PM, Du Li <[email protected]> wrote:
Actually I tried that before asking. However, it killed the spark context. :-)
Du
On Wednesday, May 13, 2015 12:02 PM, Tathagata Das <[email protected]>
wrote:
That is a good question. I dont see a direct way to do that.
You could do try the following
val jobGroupId =
<group-id-based-on-current-time>rdd.sparkContext.setJobGroup(jobGroupId)val
approxCount = rdd.countApprox().getInitialValue // job launched with the set
job grouprdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job
On Wed, May 13, 2015 at 11:24 AM, Du Li <[email protected]> wrote:
Hi TD,
Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout?
Otherwise it keeps running until completion, producing results not used but
consuming resources.
Thanks,Du
On Wednesday, May 13, 2015 10:33 AM, Du Li <[email protected]>
wrote:
Hi TD,
Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app
is standing a much better chance to complete processing each batch within the
batch interval.
Du
On Tuesday, May 12, 2015 10:31 PM, Tathagata Das <[email protected]>
wrote:
From the code it seems that as soon as the " rdd.countApprox(5000)" returns,
you can call "pResult.initialValue()" to get the approximate count at that
point of time (that is after timeout). Calling "pResult.getFinalValue()" will
further block until the job is over, and give the final correct values that you
would have received by "rdd.count()"
On Tue, May 12, 2015 at 5:03 PM, Du Li <[email protected]> wrote:
HI,
I tested the following in my streaming app and hoped to get an approximate
count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed
to always return after it finishes completely, just like rdd.count(), which
often exceeded 5 seconds. The values for low, mean, and high were the same.
val pResult = rdd.countApprox(5000)val bDouble =
pResult.getFinalValue()logInfo(s"countApprox().getFinalValue():
low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong},
high=${bDouble.high.toLong}")
Can any expert here help explain the right way of usage?
Thanks,Du
On Wednesday, May 6, 2015 7:55 AM, Du Li <[email protected]>
wrote:
I have to count RDD's in a spark streaming app. When data goes large, count()
becomes expensive. Did anybody have experience using countApprox()? How
accurate/reliable is it?
The documentation is pretty modest. Suppose the timeout parameter is in
milliseconds. Can I retrieve the count value by calling getFinalValue()? Does
it block and return only after the timeout? Or do I need to define
onComplete/onFail handlers to extract count value from the partial results?
Thanks,Du