Yes, but what they do is to only add new elements to a state which is
passed as parameter. But my problem is that my "counter" (the hyperloglog
object) comes from outside and is not passed to the function. So i have to
track the state of this "external" hll object accross the whole lifecycle
of the stream. And that doesn't seem to work just like that. Spark copies
all nessecary objects from the enclosing scope to all the worker nodes,
doesn't it? So I need a mechanism to use one single hyperloglog object and
share its state accross the entire application... Sorry if the problem
wasn't clear.


2015-12-02 15:03 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>
>> Have you taken a look
>> at streaming//src/test/java/org/apache/spark/streaming/JavaAPISuite.java ?
>>
>> especially testUpdateStateByKeyWithInitial()
>>
>> Cheers
>>
>> On Wed, Dec 2, 2015 at 2:54 AM, JayKay <juliankeppel1...@gmail.com>
>> wrote:
>>
>>> I'm new to Apache Spark and an absolute beginner. I'm playing around with
>>> Spark Streaming (API version 1.5.1) in Java and want to implement a
>>> prototype which uses HyperLogLog to estimate distinct elements. I use the
>>> stream-lib from clearspring (https://github.com/addthis/stream-lib).
>>>
>>> I planned to use updateStateByKey to hold a global state over all events.
>>> The problem is that for every call of the specified function, my HLL
>>> returns
>>> a 1 (it seems to use a new instance of my HLL object every time). Same
>>> problem occurs with a simple, global integer variable which I tried to
>>> increment in every function call. This also has always the initial value
>>> in
>>> it.
>>>
>>> This is a code snippet where I define the update function:
>>>
>>> Function2<List&lt;String>, Optional<Long>, Optional<Long>>
>>> hllCountFunction
>>> = new Function2<List&lt;String>, Optional<Long>, Optional<Long>>() {
>>>         public Optional<Long> call(List<String> values, Optional<Long>
>>> state)
>>> throws Exception {
>>>                 values.stream().forEach(value -> hll.offer(value));
>>>                 long newState = state.isPresent() ? hll.cardinality() :
>>> 0;
>>>                 return Optional.of(newState);
>>>         }
>>> };
>>>
>>>
>>> And this is the snippet how I use the function:
>>>
>>> JavaPairDStream<String, Long> hllCounts = fullvisitorids.mapToPair(new
>>> PairFunction<String, String, String>() {
>>>         public Tuple2<String, String> call(String value) {
>>>                 return new Tuple2<String, String>("key", value);
>>>         }
>>> }).updateStateByKey(hllCountFunction);
>>>
>>> After a lot of research I found the concept of Accumulators. Do I need to
>>> specify a custom Accumulator by extending the Accumulator class (in
>>> Java)? I
>>> also read that for transformations this only should be used for debugging
>>> purposes...
>>>
>>> So how can I achive to use one global defined HLL-object in a spark
>>> stream
>>> transformation? I also tried to implement a custom Accumulator but this
>>> also
>>> failed because I don't get how to use the AccumulableParam interface. I
>>> implemented the Accumulator and overwrote the add and value methods. But
>>> what do I have to do in the AccumulableParam with addAccumulator,
>>> addInPlace
>>> and zero?
>>>
>>> Thanks in advance for your help and your advice!
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to