It might help if you share what exactly you are hoping to improve upon. Do you want to avoid the separate sessionWindow transforms? Do it all with one SQL query? Or just have it be generally more concise/readable?
I'd think it would be possible to do this with a single SQL query but I'm not sure, maybe the windowing would mess things up. Did you try joining all 5 streams with a single query and run into a problem? Brian On Tue, May 19, 2020 at 9:38 AM bharghavi vajrala <vbharghavi...@gmail.com> wrote: > Hi All, > > Need your inputs on below scenario: > > Source : Kafka (Actual source is oracle db, data is pushed to kafka) > SDK : Java > Runner : Flink > Problem: Subscribe to 5 topics(tables) join with different keys, Group by > based on few columns. > Existing solution: Using session window of 20 seconds having different > transform for every 2 queries and using the result. > > Below is the sample code: > > Sessions sessionWindow = > Sessions.withGapDuration(Duration.standardSeconds((long) > Integer.parseInt("20"))); > > PCollection<Row> stream1 = > PCollectionTuple > .of(new TupleTag<>("TABLE1"), rows1) > .and(new TupleTag<>("TABLE2"), rows2) > .apply("rows1-rows2", SqlTransform.query( > "select t1.col1,t1.col2,t2.col5 from " > "TABLE1 t1 join TABLE2 t2 \n" + > "on t1.col5 = t2.col7 " > ) > ) > .apply("window" , Window.into(sessionWindow)); > > PCollection<Row> mergedStream = > PCollectionTuple > .of(new TupleTag<>("MERGE-TABLE"), stream1) > .apply("merge" , SqlTransform.query("select > col1,col2, \n" + > "max(case when col3='D' then col8 end) > as D_col3,\n" + > "max(case when col3='C' then col8 end) > as C_col3,\n" + > "max(case when col6='CP' then col10 end) > as CP_col6,\n" + > "max(case when col6='DP' then col10 end) > as DP_col6\n" + > "from MERGE-TABLE " + > "group by col1,col2\n " > )).apply("merge-window", > Window.into(sessionWindow)); > > PCollection<Row> stream2 = > PCollectionTuple > .of(new TupleTag<>("TABLE3"), mergedStream) > .and(new TupleTag<>("TABLE4"), stream22) > .apply( > > SqlTransform.query("select distinct c1,c2,c4 from " + > "TABLE3 d1 > join TABLE4 d2\n" + > " on d1.num= d2.tr_num ")) > .apply("e-window" , Window.into(sessionWindow)); > > > Is there any better approach? > > Looking forward for suggestions. > > Thanks!! > >