Hello,
We are creating two data streams in our Flink application. Both of them are
then formed into two Tables. The first data stream has a watermark delay of
24 hours while the second stream has a watermark delay of 60 minutes. The
watermark used is of BoundedOutOfOrderness strategy and uses a particular
event_time field present within the the records themselves to assign
watermarks.
For example,
DataStream<Row> fileStream = env.fromSource(
fileSource,
getWatermarkStrategy(86400000), // custom function,
watermark of 24 hours in ms
"fileSource");
Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions);
tableEnv.createTemporaryView("fileTable", firstTable);
DataStream<Row> kafkaStream = env.fromSource(
kafkaSource,
getWatermarkStrategy(3600000), // custom function,
watermark
of 60 minutes in ms
"kafkaSource");
Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions);
tableEnv.createTemporaryView("kafkaTable", secondTable);
Now we want to write a continuous SQL query to join
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/dynamic_tables/#continuous-queries>
firstTable and secondTable with a TumbleWindow of 60 minutes
"SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS
event_time,
MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," +
"FROM fileTable, kafkaTable " +
"where fileTable.id = kafkaTable.id " +
"group by TUMBLE(fileTable.rowtime, INTERVAL '60'
MINUTE)"
What we want to know is, will a join or aggregation queries work correctly
between the two tables. Is it the case that the contents of kafkaTable
will be purged immediately after 60 minutes and hence a join/aggregation
might not give correct results ?
Will there be a data loss if tables with different watermark delays are
joined ?
--
*Regards,*
*Meghajit*