Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Averell
Thank you Xingcan. Regarding that Either, I still see the need to do TypeCasting/CaseClass matching. Could you please help give a look? val transformed = dog     .union(cat)     .connect(transformer)     .keyBy(r => r.name, r2 => r2.name)     .process(new Transfo

Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Xingcan Cui
Hi Averell, With the CoProcessFunction, you could get access to the time-related services which may be useful when maintaining the elements in Stream_C and you could get rid of type casting with the Either class. Best, Xingcan > On Aug 15, 2018, at 3:27 PM, Averell wrote: > > Thank you Vino

Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread vino yang
Hi Averell, What I mean is that if you store stream_c data in an RDBMS, you can access the RDBMS directly in the CoFlatMapFunction instead of using the Table API. This is somewhat similar to stream and dimension table joins. Of course, the premise of adopting this option is that the amount of data

Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Averell
Thank you Vino & Xingcan. @Vino: could you help explain more details on using DBMS? Would that be with using TableAPI, or you meant directly reading DBMS data inside the ProcessFunction? @Xingcan: I don't know what are the benefits of using CoProcess over RichCoFlatMap in this case. Regarding usin

Re: CoFlatMapFunction with more than two input streams

2018-08-14 Thread Xingcan Cui
Hi Averell, I am also in favor of option 2. Besides, you could use CoProcessFunction instead of CoFlatMapFunction and try to wrap elements of stream_A and stream_B using the `Either` class. Best, Xingcan > On Aug 15, 2018, at 2:24 PM, vino yang wrote: > > Hi Averell, > > As far as these two

Re: CoFlatMapFunction with more than two input streams

2018-08-14 Thread vino yang
Hi Averell, As far as these two solutions are concerned, I think you can only choose option 2, because as you have stated, the current Flink DataStream API does not support the replacement of one of the input stream types of CoFlatMapFunction. Another choice: 1. Split it into two separate jobs. B

CoFlatMapFunction with more than two input streams

2018-08-14 Thread Averell
Hi, I have stream_A of type "Dog", which needs to be transformed using data from stream_C of type "Name_Mapping". As stream_C is a slow one (data is not being updated frequently), to do the transformation I connect two streams, do a keyBy, and then use a RichCoFlatMapFunction in which mapping data