For example, two kafka topic has a field named key. And I want to merge the two 
topics in beam streams in max timeout time.
For now, I implement it in flink by using flink state storage.
Code is like:

KeyedStream<T> keyedStreams = topicA.connect( topicB).map(XXX).keyBy(XXX);
keyedStream.process(new RichProcessFunction<Object, Object>() {
    private ValueState<Object> state;
    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new 
ValueStateDescriptor<Object>("name", XXXXXX);
    }

    @Override
    public void processElement(Object o, Context context, Collector<Object> 
collector) throws Exception {
        if (state.value() != null) {
            // merge two value and emit in there.
            state.update(null); //clear after merge
        } else {
            state.update(o);
            
context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
mergeTimeoutMs); //set timeout process
        }
    }

    @Override
    public void onTimer(long l, OnTimerContext onTimerContext, 
Collector<Object> collector) throws Exception {
        if (state.value() != null) {
            // process timeout element
        }
    }
});

By using this code, most data can be emitted immediately after it merged, some 
of data with some latency will be merged in late. Some of data can’t be merged 
in specific time will be
process by timeout function.
How can I implement this in beam? 

Reply via email to