OK, after read the docs: 
https://docs.google.com/document/d/1vyel3XRfdeGyqLvXiy1C3mrw9QBbveoKjjVLuRxMw4k/edit#.
I known the answer.
Just use StatefulDoFn.

On 2017-05-21 15:49 (+0800), yunfan zhang <[email protected]> wrote: 
> For example, two kafka topic has a field named key. And I want to merge the 
> two topics in beam streams in max timeout time.
> For now, I implement it in flink by using flink state storage.
> Code is like:
> 
> KeyedStream<T> keyedStreams = topicA.connect( topicB).map(XXX).keyBy(XXX);
> keyedStream.process(new RichProcessFunction<Object, Object>() {
>     private ValueState<Object> state;
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         state = getRuntimeContext().getState(new 
> ValueStateDescriptor<Object>("name", XXXXXX);
>     }
> 
>     @Override
>     public void processElement(Object o, Context context, Collector<Object> 
> collector) throws Exception {
>         if (state.value() != null) {
>             // merge two value and emit in there.
>             state.update(null); //clear after merge
>         } else {
>             state.update(o);
>             
> context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() 
> + mergeTimeoutMs); //set timeout process
>         }
>     }
> 
>     @Override
>     public void onTimer(long l, OnTimerContext onTimerContext, 
> Collector<Object> collector) throws Exception {
>         if (state.value() != null) {
>             // process timeout element
>         }
>     }
> });
> 
> By using this code, most data can be emitted immediately after it merged, 
> some of data with some latency will be merged in late. Some of data can’t 
> be merged in specific time will be
> process by timeout function.
> How can I implement this in beam? 

Reply via email to