Hi,

The watermark of the join operator is the minimum of the watermark of the
input streams.

```
JoinOperator.watermark = min(left.watermark, right.watermark);
```

I think it's enough for most cases.  Could you share more details about the
logic in the UDF getEventTimeInNS?

I think the better solution comparing to the intermediate table is to
define the watermark on the VIEW. But Flink doesn't support it now.

Best,
Shengkai




liuxiangcao <xiangcaohe...@gmail.com> 于2022年4月16日周六 03:07写道:

> Hi Flink community,
>
> *Here is the context: *
> Theoretically, I would like to write following query but it won't work
> since we can only define the WATERMARK in a table DDL:
>
> INSERT into tableC
> select tableA.field1
>          SUM(1) as `count`,
>          time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>          WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
> from tableA join tableB
> on tableA.joinCol == tableB.joinCol
> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
> (note: getEventTimeInNS is a UDF that calculates event time using 
> tableA.timestamp and tableB.timestamp)
>
>
> so I have to define a intermediary table to store the results from
> joining, and defining event time and watermark in the table DDL, then
> performs tumbling windowing on the intermediary table:
>
> CREATE TABLE IntermediaryTable (
>    field1,
>   `eventTimestampInNanoseconds`  BIGINT,
>    time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
>    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'IntermediaryTable',
>   'properties.bootstrap.servers' = 'xxxxxx',
>   'properties.group.id' = 'contextevent-streaming-sql',
>   'format' = 'avro'
> );
>
> INSERT INTO IntermediaryTable
> select tableA.field1
>           tableB.field2,
>           getEventTimeInNS(tableA.timestamp, tableB.timestamp),
> from tableA join tableB
> on tableA.joinCol == tableB.joinCol;
>
> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>
> INSERT INTO countTable
> (select event.field1
>         SUM(1) as `count`
>  from IntermediaryTable event
>  GROUP BY
>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>   event.field1
> );
>
>
> This is not convenient because the IntermediaryTable writes to another
> kafka topic that is only used by the tumbling window aggregation. When I
> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
> END;", it will fail complaining the topic does not exist. I either have to
> first create this kafka topic beforehand, or run a separate job to INSERT
> INTO IntermediaryTable.
>
> In Java DataStream API, you can easily do so within flink topology without
> having to create a separate kafka topic:
>
> final DataStream<xxx> joinedStream =
>                  StreamA.join(StreamB)
>                  .where(xxxx)
>                  .equalTo(xxxx)
>                  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>                  .apply(aggregation);
>
>
> *Question:*
> Does the Flink community have any suggestions on how to do this in
> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support
> defining eventtime and watermark on the fly without a table ddl? Would love
> to hear any suggestions. Thanks a lot in advance.
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>

Reply via email to