Whatever is inside the mapPartition get executed on workers. If that
mapPartition function refers to a global variable in the driver, then that
variable get serialized and sent to the workers as well. So the hll
(defined in lline 63) is an empty HyperLogLogMonoid, that gets serialized
and sent to workers to execute the mapPartition in lines 67-69. In each
batch of data, each user id is converted to a HLL object (using the empty
hll object) and then the HLL objects from all the partitions are reduced
using the "+" in line 69 to a single HLL (i.e. a RDD  containing one
element, which is the HLL of the batch).

The subsequent foreachRDD at line 73 get that HLL from the workers to the
driver and merges it with the running globalHll.

I agree, there is probably a better way/ more intuitive way to write this
example. :)

TD


On Wed, Apr 30, 2014 at 3:45 PM, buremba <emrekaba...@gmail.com> wrote:

> Thanks for your reply. Sorry for the late response, I wanted to do some
> tests
> before writing back.
>
> The counting part works similar to your advice. I specify a minimum
> interval
> like 1 minute, in each hour, day etc. it sums all counters of the current
> children intervals.
>
> However when I want to "count unique visitors of the month" things get much
> more complex. I need to merge 30 sets which contains visitor id's and each
> of them has more than a few hundred thousands of elements. Merging sets may
> be still the best option rather than keeping another Set for last month
> though, however I'm not sure because when there are many intersections it
> may be inefficient.
>
> BTW, I have one more question. The HLL example in repository seems
> confusing
> to me. How Spark handles global variable usages in mapPartitions method?
> (
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala#L68
> )
> I'm also a newbie but I thought the map and mapPartitions methods are
> similar to Hadoop's map methods so when we run the example on a cluster how
> an external node reaches a global variable in a single node? Does Spark
> replicate HyperLogLogMonoid instances across the cluster?
>
> Thanks,
> Burak Emre
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895p5131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to