Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:

private transient MapState<String, Object> testMapState;

@Override
    public void processElement(Map<String, Object> value, Context ctx,
Collector<Map<String, Object>> out) throws Exception {

            if (testMapState.isEmpty()) {

                testMapState.putAll(value);

                out.collect(value);

                testMapState.clear();
            }
        }

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something
wrong ?
Is this behaviour expected because flink  recommendations are to use
MapState and NOT ValueState ?

BR,
Nick

Reply via email to