You can do it like this:
private static Function2<List<Long>, Optional<Long>, Optional<Long>> UPDATEFUNCTION = new Function2<List<Long>, Optional<Long>, Optional<Long>>() { @Override public Optional<Long> call(List<Long> nums, Optional<Long> current) throws Exception { long sum = current.or(0L); for (long i : nums) { sum += i; } return Optional.of(sum); } }; ..... JavaPairDStream<String, Long> newStream = myPairStream.updateStateByKey(UPDATEFUNCTION); Have a look at this codebase <https://github.com/Draki/SparkSQLPruebas/blob/5dbefa235bd6ceb3ed17685ed557fd4f6579e9f4/logs_analyzer/chapter1/java6/src/main/java/com/databricks/apps/logs/chapter1/LogAnalyzerStreamingTotal.java> If you need a working example. Thanks Best Regards On Fri, Dec 18, 2015 at 1:42 PM, Abhishek Anand <abhis.anan...@gmail.com> wrote: > I am trying to use updateStateByKey but receiving the following error. > (Spark Version 1.4.0) > > Can someone please point out what might be the possible reason for this > error. > > > *The method > updateStateByKey(Function2<List<AggregationMetrics>,Optional<S>,Optional<S>>) > in the type JavaPairDStream<String,AggregationMetrics> is not applicable > for the arguments * > > * > (Function2<List<AggregationMetrics>,Optional<AggregationMetrics>,Optional<AggregationMetrics>>)* > > > This is the update function that I am using inside updateStateByKey. > > I am applying updateStateByKey on a tuple of <String, AggregationMetrics> > > private static Function2<List<AggregationMetrics>, > Optional<AggregationMetrics>, Optional<AggregationMetrics>> updateFunction = > new Function2<List<AggregationMetrics>, Optional<AggregationMetrics>, > Optional<AggregationMetrics>>() { > /** > * > */ > private static final long serialVersionUID = 1L; > > @Override > public Optional<AggregationMetrics> call(List<AggregationMetrics> values, > Optional<AggregationMetrics> current) { > AggregationMetrics newSum = current.or(new AggregationMetrics(0L, 0L, 0L)); > for(int i=0; i < values.size(); i++) > { > //set with new values > } > return Optional.of(newSum); > } > }; > > > > Thanks, > Abhi > >