Re: Sharing object/state accross transformations

2015-12-10 Thread JayKay
I solved the problem by passing the HLL object to the function, updating it
and returning it as new state. This was obviously a thinking barrier... ;-)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544p25665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sharing object/state accross transformations

2015-12-06 Thread Julian Keppel
Yes, but what they do is to only add new elements to a state which is
passed as parameter. But my problem is that my "counter" (the hyperloglog
object) comes from outside and is not passed to the function. So i have to
track the state of this "external" hll object accross the whole lifecycle
of the stream. And that doesn't seem to work just like that. Spark copies
all nessecary objects from the enclosing scope to all the worker nodes,
doesn't it? So I need a mechanism to use one single hyperloglog object and
share its state accross the entire application... Sorry if the problem
wasn't clear.


2015-12-02 15:03 GMT+01:00 Ted Yu :
>
>> Have you taken a look
>> at streaming//src/test/java/org/apache/spark/streaming/JavaAPISuite.java ?
>>
>> especially testUpdateStateByKeyWithInitial()
>>
>> Cheers
>>
>> On Wed, Dec 2, 2015 at 2:54 AM, JayKay 
>> wrote:
>>
>>> I'm new to Apache Spark and an absolute beginner. I'm playing around with
>>> Spark Streaming (API version 1.5.1) in Java and want to implement a
>>> prototype which uses HyperLogLog to estimate distinct elements. I use the
>>> stream-lib from clearspring (https://github.com/addthis/stream-lib).
>>>
>>> I planned to use updateStateByKey to hold a global state over all events.
>>> The problem is that for every call of the specified function, my HLL
>>> returns
>>> a 1 (it seems to use a new instance of my HLL object every time). Same
>>> problem occurs with a simple, global integer variable which I tried to
>>> increment in every function call. This also has always the initial value
>>> in
>>> it.
>>>
>>> This is a code snippet where I define the update function:
>>>
>>> Function2, Optional, Optional>
>>> hllCountFunction
>>> = new Function2, Optional, Optional>() {
>>> public Optional call(List values, Optional
>>> state)
>>> throws Exception {
>>> values.stream().forEach(value -> hll.offer(value));
>>> long newState = state.isPresent() ? hll.cardinality() :
>>> 0;
>>> return Optional.of(newState);
>>> }
>>> };
>>>
>>>
>>> And this is the snippet how I use the function:
>>>
>>> JavaPairDStream hllCounts = fullvisitorids.mapToPair(new
>>> PairFunction() {
>>> public Tuple2 call(String value) {
>>> return new Tuple2("key", value);
>>> }
>>> }).updateStateByKey(hllCountFunction);
>>>
>>> After a lot of research I found the concept of Accumulators. Do I need to
>>> specify a custom Accumulator by extending the Accumulator class (in
>>> Java)? I
>>> also read that for transformations this only should be used for debugging
>>> purposes...
>>>
>>> So how can I achive to use one global defined HLL-object in a spark
>>> stream
>>> transformation? I also tried to implement a custom Accumulator but this
>>> also
>>> failed because I don't get how to use the AccumulableParam interface. I
>>> implemented the Accumulator and overwrote the add and value methods. But
>>> what do I have to do in the AccumulableParam with addAccumulator,
>>> addInPlace
>>> and zero?
>>>
>>> Thanks in advance for your help and your advice!
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Sharing object/state accross transformations

2015-12-02 Thread Ted Yu
Have you taken a look
at streaming//src/test/java/org/apache/spark/streaming/JavaAPISuite.java ?

especially testUpdateStateByKeyWithInitial()

Cheers

On Wed, Dec 2, 2015 at 2:54 AM, JayKay  wrote:

> I'm new to Apache Spark and an absolute beginner. I'm playing around with
> Spark Streaming (API version 1.5.1) in Java and want to implement a
> prototype which uses HyperLogLog to estimate distinct elements. I use the
> stream-lib from clearspring (https://github.com/addthis/stream-lib).
>
> I planned to use updateStateByKey to hold a global state over all events.
> The problem is that for every call of the specified function, my HLL
> returns
> a 1 (it seems to use a new instance of my HLL object every time). Same
> problem occurs with a simple, global integer variable which I tried to
> increment in every function call. This also has always the initial value in
> it.
>
> This is a code snippet where I define the update function:
>
> Function2, Optional, Optional> hllCountFunction
> = new Function2, Optional, Optional>() {
> public Optional call(List values, Optional
> state)
> throws Exception {
> values.stream().forEach(value -> hll.offer(value));
> long newState = state.isPresent() ? hll.cardinality() : 0;
> return Optional.of(newState);
> }
> };
>
>
> And this is the snippet how I use the function:
>
> JavaPairDStream hllCounts = fullvisitorids.mapToPair(new
> PairFunction() {
> public Tuple2 call(String value) {
> return new Tuple2("key", value);
> }
> }).updateStateByKey(hllCountFunction);
>
> After a lot of research I found the concept of Accumulators. Do I need to
> specify a custom Accumulator by extending the Accumulator class (in Java)?
> I
> also read that for transformations this only should be used for debugging
> purposes...
>
> So how can I achive to use one global defined HLL-object in a spark stream
> transformation? I also tried to implement a custom Accumulator but this
> also
> failed because I don't get how to use the AccumulableParam interface. I
> implemented the Accumulator and overwrote the add and value methods. But
> what do I have to do in the AccumulableParam with addAccumulator,
> addInPlace
> and zero?
>
> Thanks in advance for your help and your advice!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>