Hi Brain, Yes, it will be easy if I can use single sql query.
I did try but it didn't work, dont remember the error correctly but its something related to windows. I am unable to give more than 2 tables in one pTransform, every time I have to have ptransform only with one join not more than that and not even group by. On Fri, 22 May 2020, 01:24 Brian Hulette, <bhule...@google.com> wrote: > It might help if you share what exactly you are hoping to improve upon. Do > you want to avoid the separate sessionWindow transforms? Do it all with one > SQL query? Or just have it be generally more concise/readable? > > I'd think it would be possible to do this with a single SQL query but I'm > not sure, maybe the windowing would mess things up. Did you try joining all > 5 streams with a single query and run into a problem? > > Brian > > On Tue, May 19, 2020 at 9:38 AM bharghavi vajrala <vbharghavi...@gmail.com> > wrote: > >> Hi All, >> >> Need your inputs on below scenario: >> >> Source : Kafka (Actual source is oracle db, data is pushed to kafka) >> SDK : Java >> Runner : Flink >> Problem: Subscribe to 5 topics(tables) join with different keys, Group by >> based on few columns. >> Existing solution: Using session window of 20 seconds having different >> transform for every 2 queries and using the result. >> >> Below is the sample code: >> >> Sessions sessionWindow = >> Sessions.withGapDuration(Duration.standardSeconds((long) >> Integer.parseInt("20"))); >> >> PCollection<Row> stream1 = >> PCollectionTuple >> .of(new TupleTag<>("TABLE1"), rows1) >> .and(new TupleTag<>("TABLE2"), rows2) >> .apply("rows1-rows2", SqlTransform.query( >> "select t1.col1,t1.col2,t2.col5 from " >> "TABLE1 t1 join TABLE2 t2 \n" + >> "on t1.col5 = t2.col7 " >> ) >> ) >> .apply("window" , Window.into(sessionWindow)); >> >> PCollection<Row> mergedStream = >> PCollectionTuple >> .of(new TupleTag<>("MERGE-TABLE"), stream1) >> .apply("merge" , SqlTransform.query("select >> col1,col2, \n" + >> "max(case when col3='D' then col8 end) >> as D_col3,\n" + >> "max(case when col3='C' then col8 end) >> as C_col3,\n" + >> "max(case when col6='CP' then col10 >> end) as CP_col6,\n" + >> "max(case when col6='DP' then col10 >> end) as DP_col6\n" + >> "from MERGE-TABLE " + >> "group by col1,col2\n " >> )).apply("merge-window", >> Window.into(sessionWindow)); >> >> PCollection<Row> stream2 = >> PCollectionTuple >> .of(new TupleTag<>("TABLE3"), mergedStream) >> .and(new TupleTag<>("TABLE4"), stream22) >> .apply( >> >> SqlTransform.query("select distinct c1,c2,c4 from " + >> "TABLE3 >> d1 join TABLE4 d2\n" + >> " on d1.num= d2.tr_num ")) >> .apply("e-window" , Window.into(sessionWindow)); >> >> >> Is there any better approach? >> >> Looking forward for suggestions. >> >> Thanks!! >> >>