I am trying to use updateStateByKey but receiving the following error.
(Spark Version 1.4.0)

Can someone please point out what might be the possible reason for this
error.


*The method
updateStateByKey(Function2<List<AggregationMetrics>,Optional<S>,Optional<S>>)
in the type JavaPairDStream<String,AggregationMetrics> is not applicable
for the arguments *
* 
(Function2<List<AggregationMetrics>,Optional<AggregationMetrics>,Optional<AggregationMetrics>>)*


This is the update function that I am using inside updateStateByKey.

I am applying updateStateByKey on a tuple of <String, AggregationMetrics>

private static Function2<List<AggregationMetrics>,
Optional<AggregationMetrics>, Optional<AggregationMetrics>> updateFunction =
new Function2<List<AggregationMetrics>, Optional<AggregationMetrics>,
Optional<AggregationMetrics>>() {
/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Optional<AggregationMetrics> call(List<AggregationMetrics> values,
Optional<AggregationMetrics> current) {
AggregationMetrics newSum = current.or(new AggregationMetrics(0L, 0L, 0L));
for(int i=0; i < values.size(); i++)
{
//set with new values
}
return Optional.of(newSum);
}
};



Thanks,
Abhi

Reply via email to