It might help if you share what exactly you are hoping to improve upon. Do
you want to avoid the separate sessionWindow transforms? Do it all with one
SQL query? Or just have it be generally more concise/readable?

I'd think it would be possible to do this with a single SQL query but I'm
not sure, maybe the windowing would mess things up. Did you try joining all
5 streams with a single query and run into a problem?

Brian

On Tue, May 19, 2020 at 9:38 AM bharghavi vajrala <vbharghavi...@gmail.com>
wrote:

> Hi All,
>
> Need your inputs on below scenario:
>
> Source : Kafka (Actual source is oracle db, data is pushed to kafka)
> SDK :  Java
> Runner : Flink
> Problem: Subscribe to 5 topics(tables) join with different keys, Group by
> based on few columns.
> Existing solution: Using session window of 20 seconds having different
> transform for every 2 queries and using the result.
>
> Below is the sample code:
>
>         Sessions sessionWindow =
> Sessions.withGapDuration(Duration.standardSeconds((long)
> Integer.parseInt("20")));
>
>   PCollection<Row> stream1 =
>                 PCollectionTuple
>                         .of(new TupleTag<>("TABLE1"), rows1)
>                         .and(new TupleTag<>("TABLE2"), rows2)
>                         .apply("rows1-rows2", SqlTransform.query(
>                                 "select  t1.col1,t1.col2,t2.col5 from "
>                                         "TABLE1 t1  join TABLE2 t2 \n" +
>                                         "on t1.col5 = t2.col7 "
>                                 )
>                         )
>                         .apply("window" , Window.into(sessionWindow));
>
>    PCollection<Row> mergedStream =
>                 PCollectionTuple
>                         .of(new TupleTag<>("MERGE-TABLE"), stream1)
>                         .apply("merge" , SqlTransform.query("select
> col1,col2, \n" +
>                                 "max(case when col3='D' then col8   end)
> as D_col3,\n" +
>                                 "max(case when col3='C' then col8   end)
> as C_col3,\n" +
>                                 "max(case when col6='CP' then col10   end)
> as CP_col6,\n" +
>                                 "max(case when col6='DP' then col10   end)
> as DP_col6\n" +
>                                 "from MERGE-TABLE " +
>                                 "group by  col1,col2\n "
>                         )).apply("merge-window",
> Window.into(sessionWindow));
>
>  PCollection<Row> stream2 =
>                 PCollectionTuple
>                         .of(new TupleTag<>("TABLE3"), mergedStream)
>                         .and(new TupleTag<>("TABLE4"), stream22)
>                         .apply(
>
>  SqlTransform.query("select distinct c1,c2,c4 from   " +
>                                                                 "TABLE3 d1
> join TABLE4  d2\n" +
>                                 " on  d1.num= d2.tr_num  "))
>                         .apply("e-window" , Window.into(sessionWindow));
>
>
> Is there any better approach?
>
> Looking forward for suggestions.
>
> Thanks!!
>
>

Reply via email to