Hi All, I'm playing with the new mapWithState functionality but I can't get it quite to work yet.
I'm doing two print() calls on the stream: 1. after mapWithState() call, first batch shows results - next batches yield empty 2. after stateSnapshots(), always yields an empty RDD Any pointers on what might be wrong? This is the code I'm running: final StateSpec state = StateSpec.function(UseCase::trackState); JavaPairDStream<GroupingKey, Double> pairs = messages.<GroupingKey, Double>mapToPair(UseCase::mapToPair);JavaMapWithStateDStream<GroupingKey, Double, Double, Double> stateMap = pairs.mapWithState(state); stateMap.print(5); stateMap.stateSnapshots() .print(5); stream.context().remember(minutes(120));stream.context().checkpoint("/rsl/tmp/fxo-checkpoint"); private static Optional<Tuple2<GroupingKey, Double>> trackState(Time batchTime, GroupingKey key, Optional<Double> value, State<Double> state) { Double current = state.exists() ? state.get() : 0.0; Double sum = current + value.or(0.0); return Optional.of(new Tuple2<>(key, sum)); } Cheers, Seb