Hi Team,

Could you provide a sample how to pass Flink Datastream Source and sink
results to increment COUNTER and then I want to display the Counter in
Local IDE.
Counter to display for #1 through #3.

1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
2) DataStream<String> outputStream =
messageStream.rebalance().map(CustomMapFunction());
3) outputStream.addSink(Streaming File Sink).

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }}


Thanks,
Vijay

Reply via email to