For example, two kafka topic has a field named key. And I want to merge the two
topics in beam streams in max timeout time.
For now, I implement it in flink by using flink state storage.
Code is like:
KeyedStream<T> keyedStreams = topicA.connect( topicB).map(XXX).keyBy(XXX);
keyedStream.process(new RichProcessFunction<Object, Object>() {
private ValueState<Object> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new
ValueStateDescriptor<Object>("name", XXXXXX);
}
@Override
public void processElement(Object o, Context context, Collector<Object>
collector) throws Exception {
if (state.value() != null) {
// merge two value and emit in there.
state.update(null); //clear after merge
} else {
state.update(o);
context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() +
mergeTimeoutMs); //set timeout process
}
}
@Override
public void onTimer(long l, OnTimerContext onTimerContext,
Collector<Object> collector) throws Exception {
if (state.value() != null) {
// process timeout element
}
}
});
By using this code, most data can be emitted immediately after it merged, some
of data with some latency will be merged in late. Some of data can’t be merged
in specific time will be
process by timeout function.
How can I implement this in beam?