Have you taken a look
at streaming//src/test/java/org/apache/spark/streaming/JavaAPISuite.java ?

especially testUpdateStateByKeyWithInitial()

Cheers

On Wed, Dec 2, 2015 at 2:54 AM, JayKay <juliankeppel1...@gmail.com> wrote:

> I'm new to Apache Spark and an absolute beginner. I'm playing around with
> Spark Streaming (API version 1.5.1) in Java and want to implement a
> prototype which uses HyperLogLog to estimate distinct elements. I use the
> stream-lib from clearspring (https://github.com/addthis/stream-lib).
>
> I planned to use updateStateByKey to hold a global state over all events.
> The problem is that for every call of the specified function, my HLL
> returns
> a 1 (it seems to use a new instance of my HLL object every time). Same
> problem occurs with a simple, global integer variable which I tried to
> increment in every function call. This also has always the initial value in
> it.
>
> This is a code snippet where I define the update function:
>
> Function2<List&lt;String>, Optional<Long>, Optional<Long>> hllCountFunction
> = new Function2<List&lt;String>, Optional<Long>, Optional<Long>>() {
>         public Optional<Long> call(List<String> values, Optional<Long>
> state)
> throws Exception {
>                 values.stream().forEach(value -> hll.offer(value));
>                 long newState = state.isPresent() ? hll.cardinality() : 0;
>                 return Optional.of(newState);
>         }
> };
>
>
> And this is the snippet how I use the function:
>
> JavaPairDStream<String, Long> hllCounts = fullvisitorids.mapToPair(new
> PairFunction<String, String, String>() {
>         public Tuple2<String, String> call(String value) {
>                 return new Tuple2<String, String>("key", value);
>         }
> }).updateStateByKey(hllCountFunction);
>
> After a lot of research I found the concept of Accumulators. Do I need to
> specify a custom Accumulator by extending the Accumulator class (in Java)?
> I
> also read that for transformations this only should be used for debugging
> purposes...
>
> So how can I achive to use one global defined HLL-object in a spark stream
> transformation? I also tried to implement a custom Accumulator but this
> also
> failed because I don't get how to use the AccumulableParam interface. I
> implemented the Accumulator and overwrote the add and value methods. But
> what do I have to do in the AccumulableParam with addAccumulator,
> addInPlace
> and zero?
>
> Thanks in advance for your help and your advice!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to