I think it's not a good idea to defining a watermark on a view, because 
currently the view is only a set of SQL query text in Flink , and a query 
should not contain a watermark definition. You can see the discussion here: 
https://issues.apache.org/jira/browse/FLINK-22804
Maybe you can open a jira again to discuss the behavior you expect.



在 2022-04-29 13:30:34,"liuxiangcao" <xiangcaohe...@gmail.com> 写道:

Hi Shengkai, 


Thank you for the reply. 


The UDF getEventTimeInNS uses timestamps of both streamA and streamB to 
calculate the true event time for streamB events. 


For illustrating purpose, we can consider it to be like this: 
public Long eval(
        Long baseTimeStampFromA,
Long timestampA
        Long timestampB) {
return baseTimeStampFromA + timestampB - timestampA;
}
Basically I need to redefine the event timestamp and watermark for the output 
stream of a join operator. 


You are right. Ideally I hope FlinkSQL can support defining a watermark on a 
view.  Do you know if this was discussed in the Flink community before? 
Wondering whether this may be supported in future. 



On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang <fskm...@gmail.com> wrote:

Hi, 


The watermark of the join operator is the minimum of the watermark of the input 
streams. 


```
JoinOperator.watermark = min(left.watermark, right.watermark);
```


I think it's enough for most cases.  Could you share more details about the 
logic in the UDF getEventTimeInNS? 

I think the better solution comparing to the intermediate table is to define 
the watermark on the VIEW. But Flink doesn't support it now.


Best,
Shengkai








liuxiangcao <xiangcaohe...@gmail.com> 于2022年4月16日周六 03:07写道:

Hi Flink community, 


Here is the context: 
Theoretically, I would like to write following query but it won't work since we 
can only define the WATERMARK in a table DDL:

INSERT into tableC
select tableA.field1 
SUM(1) as `count`, 
         time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
         WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
from tableA join tableB
on tableA.joinCol == tableB.joinCol
group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
(note: getEventTimeInNS is a UDF that calculates event time using 
tableA.timestamp and tableB.timestamp)


so I have to define a intermediary table to store the results from joining, and 
defining event time and watermark in the table DDL, then performs tumbling 
windowing on the intermediary table: 
CREATE TABLE IntermediaryTable (
   field1,
  `eventTimestampInNanoseconds`  BIGINT,
   time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
   WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'IntermediaryTable',
'properties.bootstrap.servers' = 'xxxxxx',
'properties.group.id' = 'contextevent-streaming-sql',
'format' = 'avro'
);
INSERT INTO IntermediaryTable
select tableA.field1
          tableB.field2,
          getEventTimeInNS(tableA.timestamp, tableB.timestamp),
from tableA join tableB
on tableA.joinCol == tableB.joinCol;
Then, I can perform tumbling window aggregation on the IntermediaryTable:
INSERT INTO countTable
(select event.field1
SUM(1) as `count`
from IntermediaryTable event
GROUP BY
TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
  event.field1
);


This is not convenient because the IntermediaryTable writes to another kafka 
topic that is only used by the tumbling window aggregation. When I try to group 
the two INSERT INTO statements within "BEGIN STATEMENT SET; END;", it will fail 
complaining the topic does not exist. I either have to first create this kafka 
topic beforehand, or run a separate job to INSERT INTO IntermediaryTable. 


In Java DataStream API, you can easily do so within flink topology without 
having to create a separate kafka topic: 
final DataStream<xxx> joinedStream =
                 StreamA.join(StreamB)
                 .where(xxxx)
                 .equalTo(xxxx)
                 .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
                 .apply(aggregation);


Question:
Does the Flink community have any suggestions on how to do this in FlinkSQL in 
a friendly way? Would it be a good idea for FlinkSQL to support defining 
eventtime and watermark on the fly without a table ddl? Would love to hear any 
suggestions. Thanks a lot in advance. 


--

Best Wishes & Regards
Shawn Xiangcao Liu





--

Best Wishes & Regards
Shawn Xiangcao Liu

Reply via email to