Have you taken a look at streaming//src/test/java/org/apache/spark/streaming/JavaAPISuite.java ?
especially testUpdateStateByKeyWithInitial() Cheers On Wed, Dec 2, 2015 at 2:54 AM, JayKay <juliankeppel1...@gmail.com> wrote: > I'm new to Apache Spark and an absolute beginner. I'm playing around with > Spark Streaming (API version 1.5.1) in Java and want to implement a > prototype which uses HyperLogLog to estimate distinct elements. I use the > stream-lib from clearspring (https://github.com/addthis/stream-lib). > > I planned to use updateStateByKey to hold a global state over all events. > The problem is that for every call of the specified function, my HLL > returns > a 1 (it seems to use a new instance of my HLL object every time). Same > problem occurs with a simple, global integer variable which I tried to > increment in every function call. This also has always the initial value in > it. > > This is a code snippet where I define the update function: > > Function2<List<String>, Optional<Long>, Optional<Long>> hllCountFunction > = new Function2<List<String>, Optional<Long>, Optional<Long>>() { > public Optional<Long> call(List<String> values, Optional<Long> > state) > throws Exception { > values.stream().forEach(value -> hll.offer(value)); > long newState = state.isPresent() ? hll.cardinality() : 0; > return Optional.of(newState); > } > }; > > > And this is the snippet how I use the function: > > JavaPairDStream<String, Long> hllCounts = fullvisitorids.mapToPair(new > PairFunction<String, String, String>() { > public Tuple2<String, String> call(String value) { > return new Tuple2<String, String>("key", value); > } > }).updateStateByKey(hllCountFunction); > > After a lot of research I found the concept of Accumulators. Do I need to > specify a custom Accumulator by extending the Accumulator class (in Java)? > I > also read that for transformations this only should be used for debugging > purposes... > > So how can I achive to use one global defined HLL-object in a spark stream > transformation? I also tried to implement a custom Accumulator but this > also > failed because I don't get how to use the AccumulableParam interface. I > implemented the Accumulator and overwrote the add and value methods. But > what do I have to do in the AccumulableParam with addAccumulator, > addInPlace > and zero? > > Thanks in advance for your help and your advice! > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >