You can do it like this:

private static Function2<List<Long>, Optional<Long>, Optional<Long>>
      UPDATEFUNCTION =
      new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
        @Override
        public Optional<Long> call(List<Long> nums, Optional<Long> current)
            throws Exception {
          long sum = current.or(0L);
          for (long i : nums) {
            sum += i;
          }
          return Optional.of(sum);
        }
      };

.....

JavaPairDStream<String, Long> newStream =
myPairStream.updateStateByKey(UPDATEFUNCTION);


Have a look at this codebase
<https://github.com/Draki/SparkSQLPruebas/blob/5dbefa235bd6ceb3ed17685ed557fd4f6579e9f4/logs_analyzer/chapter1/java6/src/main/java/com/databricks/apps/logs/chapter1/LogAnalyzerStreamingTotal.java>
If you need a working example.

Thanks
Best Regards

On Fri, Dec 18, 2015 at 1:42 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> I am trying to use updateStateByKey but receiving the following error.
> (Spark Version 1.4.0)
>
> Can someone please point out what might be the possible reason for this
> error.
>
>
> *The method
> updateStateByKey(Function2<List<AggregationMetrics>,Optional<S>,Optional<S>>)
> in the type JavaPairDStream<String,AggregationMetrics> is not applicable
> for the arguments *
>
> * 
> (Function2<List<AggregationMetrics>,Optional<AggregationMetrics>,Optional<AggregationMetrics>>)*
>
>
> This is the update function that I am using inside updateStateByKey.
>
> I am applying updateStateByKey on a tuple of <String, AggregationMetrics>
>
> private static Function2<List<AggregationMetrics>,
> Optional<AggregationMetrics>, Optional<AggregationMetrics>> updateFunction =
> new Function2<List<AggregationMetrics>, Optional<AggregationMetrics>,
> Optional<AggregationMetrics>>() {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> @Override
> public Optional<AggregationMetrics> call(List<AggregationMetrics> values,
> Optional<AggregationMetrics> current) {
> AggregationMetrics newSum = current.or(new AggregationMetrics(0L, 0L, 0L));
> for(int i=0; i < values.size(); i++)
> {
> //set with new values
> }
> return Optional.of(newSum);
> }
> };
>
>
>
> Thanks,
> Abhi
>
>

Reply via email to