Hi Brain,

Yes, it will be easy if I can use single sql query.

I did try but it didn't  work, dont remember  the error  correctly  but its
something  related  to windows.  I am unable to give more than 2 tables in
one pTransform, every time I have to have ptransform only with one join not
more than that and not even group  by.

On Fri, 22 May 2020, 01:24 Brian Hulette, <bhule...@google.com> wrote:

> It might help if you share what exactly you are hoping to improve upon. Do
> you want to avoid the separate sessionWindow transforms? Do it all with one
> SQL query? Or just have it be generally more concise/readable?
>
> I'd think it would be possible to do this with a single SQL query but I'm
> not sure, maybe the windowing would mess things up. Did you try joining all
> 5 streams with a single query and run into a problem?
>
> Brian
>
> On Tue, May 19, 2020 at 9:38 AM bharghavi vajrala <vbharghavi...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> Need your inputs on below scenario:
>>
>> Source : Kafka (Actual source is oracle db, data is pushed to kafka)
>> SDK :  Java
>> Runner : Flink
>> Problem: Subscribe to 5 topics(tables) join with different keys, Group by
>> based on few columns.
>> Existing solution: Using session window of 20 seconds having different
>> transform for every 2 queries and using the result.
>>
>> Below is the sample code:
>>
>>         Sessions sessionWindow =
>> Sessions.withGapDuration(Duration.standardSeconds((long)
>> Integer.parseInt("20")));
>>
>>   PCollection<Row> stream1 =
>>                 PCollectionTuple
>>                         .of(new TupleTag<>("TABLE1"), rows1)
>>                         .and(new TupleTag<>("TABLE2"), rows2)
>>                         .apply("rows1-rows2", SqlTransform.query(
>>                                 "select  t1.col1,t1.col2,t2.col5 from "
>>                                         "TABLE1 t1  join TABLE2 t2 \n" +
>>                                         "on t1.col5 = t2.col7 "
>>                                 )
>>                         )
>>                         .apply("window" , Window.into(sessionWindow));
>>
>>    PCollection<Row> mergedStream =
>>                 PCollectionTuple
>>                         .of(new TupleTag<>("MERGE-TABLE"), stream1)
>>                         .apply("merge" , SqlTransform.query("select
>> col1,col2, \n" +
>>                                 "max(case when col3='D' then col8   end)
>> as D_col3,\n" +
>>                                 "max(case when col3='C' then col8   end)
>> as C_col3,\n" +
>>                                 "max(case when col6='CP' then col10
>>  end) as CP_col6,\n" +
>>                                 "max(case when col6='DP' then col10
>>  end) as DP_col6\n" +
>>                                 "from MERGE-TABLE " +
>>                                 "group by  col1,col2\n "
>>                         )).apply("merge-window",
>> Window.into(sessionWindow));
>>
>>  PCollection<Row> stream2 =
>>                 PCollectionTuple
>>                         .of(new TupleTag<>("TABLE3"), mergedStream)
>>                         .and(new TupleTag<>("TABLE4"), stream22)
>>                         .apply(
>>
>>  SqlTransform.query("select distinct c1,c2,c4 from   " +
>>                                                                 "TABLE3
>> d1 join TABLE4  d2\n" +
>>                                 " on  d1.num= d2.tr_num  "))
>>                         .apply("e-window" , Window.into(sessionWindow));
>>
>>
>> Is there any better approach?
>>
>> Looking forward for suggestions.
>>
>> Thanks!!
>>
>>

Reply via email to