Hi Team,
I am new to Flink. I have this use case where I have a dataStream of
Doubles and I am trying to get the total sum of whole DataStream.

I have used ReduceFunction and AggregateFunction.

Case 1: In Reduced function the output is dataStream of rolling Sum. To get
the final Sum I have to traverse the outputStream and the last value would
be my total . In my case, I don't want to iterate the whole dataStream to
get the final Sum and also don't wan't to use an extra DataStream to just
store the final aggregated value.

Case 2: I am able to access aggregate() method only after countWindow() and
in countWindow() we have to pass the size. As I don't know the size of my
dataStream(User will be sending data to me) I can't use it.

Below is my implementation of ReduceFunction

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Double> dataStream = env.fromElements(2.00, 3.00, 4.00,
11.00, 13.00, 14.00);

DataStream<Double> singleOutputStreamOperator = dataStream.keyBy(value ->
"key").reduce(new ReduceFunction<Double>() {
  @Override
  public Double reduce(Double aDouble, Double t1) throws Exception {
    return aDouble+ t1;
  }
});

singleOutputStreamOperator.print();
DataStream.Collector<Double> doubleCollector = new DataStream.Collector<>();
singleOutputStreamOperator.collectAsync(doubleCollector);
singleOutputStreamOperator.executeAndCollect("Aggregation");

Double result = null;
while( doubleCollector.getOutput().hasNext() ) {
  result = doubleCollector.getOutput().next();
  System.out.println("result = " + result);
}
The output looks like : 2.0, 5.0, 9.0, 20.0, 33.0. I simply want to get
33.0 as my aggregated value and store it in variable and give it to user.

Is there any better way to solve it for my useCase?

Thanks and regards

Reply via email to