kevinshin created FLINK-30546:
---------------------------------
Summary: WindowDeduplicate didn't write result to sink timely
Key: FLINK-30546
URL: https://issues.apache.org/jira/browse/FLINK-30546
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.15.3
Environment: flink 1.15.3
Reporter: kevinshin
StreamExecutionEnvironment checkpoint Trigger every 15s.
Datasource is kafka,build by :
*KafkaSource<MyKafkaRecord> source = KafkaSource.<MyKafkaRecord>builder()*
*.setBootstrapServers(kafkaServers)*
*.setTopicPattern(topic)*
*.setGroupId(consumerGroupId)*
*.setStartingOffsets(OffsetsInitializer.latest())*
*.setDeserializer(KafkaRecordDeserializationSchema.of(new
MyKafkaDeserialization(true, true)))*
*.build();*
which will include ConsumerRecord.timestamp and set to MyKafkaRecord's
timestamp field.
and build a DataStream by :
*DataStream<MyKafkaRecord> stream2 = env.fromSource(source,
WatermarkStrategy.<MyKafkaRecord>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event,
recordTimestamp)-> event.getTimestamp()), "Kafka Source");*
MyKafkaRecord contain the raw value of kafka record,which is a json string. **
then I will parse *MyKafkaRecord* to a POJO named *APlog ,* Aplog have a filed
named eventTime to hold MyKafkaRecord's timestamp field *:*
*DataStream<APlog> aplogStream = stream2.map(record->\{.......})*
*aplogStream.assignTimestampsAndWatermarks(WatermarkStrategy.<APlog>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event,
recordTimestamp)-> event.getEventTime()));*
Then convert the aplogStream to table aplog :
*Table aplog = tEnv.fromDataStream(aplogStream, $("userName"), $("userMAC"),
$("bssid"), $("ssid"), $("apName"), $("radioID"), $("vlanid"), $("action"),
$("ddate"), $("dtime"), $("rawValue"), $("region"), $("eventTime"),
$("eventTime_2").rowtime());*
Then regist as a TemporaryView :
*tEnv.createTemporaryView("aplog", aplog);*
Try to WindowDeduplicate on aplog :
*TableResult result = tEnv.executeSql("select * from " +*
*"(select *, ROW_NUMBER() OVER (" +*
*"PARTITION BY window_start, window_end, userName ORDER BY eventTime_2 DESC" +*
*") as row_num from " +*
*"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime_2), INTERVAL '10' SECONDS))"
+*
*") where row_num <= 1");*
*result.print();*
result didn't print out to console every time after the checkpoint completed, I
don't know when will the result to be print out.
but when I try :
*TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog,
DESCRIPTOR(eventTime_2), INTERVAL '10' SECONDS))");*
*result.print();*
I can see the result out to console every time after the checkpoint completed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)