Hi,

I'm trying to run a simple benchmark on Flink streaming reduce. It seems it
is very slow. Could you let me know if I'm doing something wrong.

Here is the program. I'm running this on 32 nodes with 20 tasks in each
node. So the parallelism is at 640.

public class StreamingReduce {
  int size;
  int iterations;
  StreamExecutionEnvironment env;
  String outFile;

  public StreamingReduce(int size, int iterations,
StreamExecutionEnvironment env, String outFile) {
    this.size = size;
    this.iterations = iterations;
    this.env = env;
    this.outFile = outFile;
  }

  public void execute() {
    DataStream<CollectiveData> stringStream = env.addSource(new
RichParallelSourceFunction<CollectiveData>() {
      int i = 1;
      int count = 0;
      int size = 0;
      int iterations = 10000;

      @Override
      public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ParameterTool p = (ParameterTool)
            getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        size = p.getInt("size", 128000);
        iterations = p.getInt("itr", 10000);
        System.out.println("6666 iterations: " + iterations + " size: " + size);
      }

      @Override
      public void run(SourceContext<CollectiveData> sourceContext)
throws Exception {
        while (count < iterations) {
          CollectiveData i = new CollectiveData(size);
          sourceContext.collect(i);
          count++;
        }
      }

      @Override
      public void cancel() {
      }
    });

    stringStream.map(new RichMapFunction<CollectiveData,
Tuple2<Integer, CollectiveData>>() {
      @Override
      public Tuple2<Integer, CollectiveData> map(CollectiveData s)
throws Exception {
        return new Tuple2<Integer, CollectiveData>(0, s);
      }
    }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
      @Override
      public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer,
CollectiveData> c1,
                                                    Tuple2<Integer,
CollectiveData> c2) throws Exception {
        return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
      }
    }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
      long start;
      int count = 0;
      int iterations;

      @Override
      public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ParameterTool p = (ParameterTool)
            getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        iterations = p.getInt("itr", 10000);
        System.out.println("7777 iterations: " + iterations);
      }

      @Override
      public void invoke(Tuple2<Integer, CollectiveData>
integerStringTuple2) throws Exception {
        if (count == 0) {
          start = System.nanoTime();
        }
        count++;
        if (count >= iterations) {
          System.out.println("Final: " + count + " " +
(System.nanoTime() - start) / 1000000 + " " +
(integerStringTuple2.f1));
        }
      }
    });

  }

  private static CollectiveData add(CollectiveData i, CollectiveData j) {
    List<Integer> r= new ArrayList<>();
    for (int k = 0; k < i.getList().size(); k++) {
      r.add((i.getList().get(k) + j.getList().get(k)));
    }
    return new CollectiveData(r);
  }
}

Thanks,
Supun..

Reply via email to