????????????????
    
????????????????????????kafka??????????????kafka????????????????????????????????????????????????????????????????????????????????????????????????????????kafka????????????????????????????????????????????????????????????????Debug????????????????????????????????????????
    ??????????????????????AbstractFetcher.java 
?? emitRecordsWithTimestamps 
??????????????????????????????????????????????for??????????????????????????????????????????????????????????????????????
    ?????? AbstractFetcher 
???? emitRecordsWithTimestamps ??????????
protected void emitRecordsWithTimestamps(
        Queue<T&gt; records,
        KafkaTopicPartitionState<T, KPH&gt; partitionState,
        long offset,
        long kafkaEventTimestamp) {
    // emit the records, using the checkpoint lock to guarantee
    // atomicity of record emission and offset state update
    synchronized (checkpointLock) {
        T record;
        while ((record = records.poll()) != null) {
            long timestamp = partitionState.extractTimestamp(record, 
kafkaEventTimestamp);
            sourceContext.collectWithTimestamp(record, timestamp);

            // this might emit a watermark, so do it after emitting the record
            partitionState.onEvent(record, timestamp);
        }
        partitionState.setOffset(offset);
    }
}




&nbsp; &nbsp;??????????????

????

kafka??????????0??????5s????????
CREATE TABLE kafka_watermark (
 &nbsp;`f1` INTEGER,
 &nbsp;`f2` TIMESTAMP(3),
  WATERMARK FOR f2 AS f2 - INTERVAL '0' SECOND
) WITH (
 &nbsp;'connector' = 'kafka',
 &nbsp;'topic' = 'test_watermark',
 &nbsp;'properties.group.id' = 'groupId',
 &nbsp;'properties.max.poll.records' = '10',
 &nbsp;'properties.max.poll.interval.ms' = '1000',
 &nbsp;'properties.fetch.max.bytes' = '52428800',
 &nbsp;'properties.max.partition.fetch.bytes' = '1048576',
 &nbsp;'scan.startup.mode' = 'earliest-offset',
 &nbsp;'properties.enable.auto.commit' = 'false',
 &nbsp;'format' = 'json',
 &nbsp;'json.ignore-parse-errors' = 'false',
 &nbsp;'json.timestamp-format.standard' = 'SQL',
 &nbsp;'json.map-null-key.mode' = 'FAIL'
);
?6?7
SELECT
  f1,
  f2
FROM kafka_watermark
GROUP BY
  TUMBLE(f2, INTERVAL '5' SECOND),
  f1,f2


????????

????????????????

{"f1":1,"f2":"2022-02-28 21:00:01.000"}

??????????1??????????????????0-5??????????1??

????????????????

{"f1":4,"f2":"2022-02-28 21:00:04.000"}

??????????4??????????????????0-5??????????1??4??

????????????????

{"f1":5,"f2":"2022-02-28 21:00:05.000"}

??????????5??&gt;=5????????0-5????????????????????????1??4??????5-10??????????5??

??????????????1??4

{"f1":2,"f2":"2022-02-28 21:00:02.000"}

????????????5??0????????0-5????????????????2??????????????????????5-10??????????5??

??????????????????

{"f1":3,"f2":"2022-02-28 21:00:03.000"}

????????????5??0????????0-5????????????????3??????????????????????5-10??????????5??

??????????????????

{"f1":7,"f2":"2022-02-28 21:00:07.000"}

??????????7??????????????????5-10??????????5??7??

??????????????????

{"f1":6,"f2":"2022-02-28 21:00:06.000"}

????????????7??????????????????5-10??????????5??7??6??

??????????????????

{"f1":10,"f2":"2022-02-28 21:00:10.000"}

??????????10??&gt;=10????????5-10????????????????????????5??7??6??????10-15??????????10??

????????????????5??7??6??????????????1??4??5??7??6








??????????

??????????????

??1

??????????????
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
??????????????????3??????????????????0-5??????????1??4??2??23??3??????5-10??????????5??

????????????????

??????????
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
??????6?????????? 
&gt;=5????????0-5????????????????????????1??4??2??23??3??????5-10??????????6??5??

??????????????1??4??2??23??3
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
??????????
{"f1":12,"f2":"2022-02-28 21:00:12.000"}
??????12?????????? 
&gt;=10????????5-10????????????????????????6??5??????10-15??????????12??

????????????????6??5??????????????1??4??2??23??3??6??5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":2,"f2":"2022-02-28 21:00:02.000"}
{"f1":23,"f2":"2022-02-28 21:00:02.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
??????????



??2

??????????????
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
{"f1":9,"f2":"2022-02-28 21:00:09.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
??????????????????6??&gt;=5????????0-5????????????????????????1??4??3??????5-10??????????9??6??5??????10-15??????????10??

??????????????1??4??3
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}


??????????
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":8,"f2":"2022-02-28 21:00:08.000"}
??????????8????????????????????5-10??????????9??6??8??7??5??????10-15??????????10??

??????????????1??4??3



??????????
{"f1":11,"f2":"2022-02-28 21:00:11.000"}
??????11?????????? 
&gt;=10????????5-10????????????????????????9??6??8??7??5??????10-15??????????10??11??

????????????????9??6??8??7??5??????????????1??4??3??9??6??8??7??5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":3,"f2":"2022-02-28 21:00:03.000"}
{"f1":9,"f2":"2022-02-28 21:00:09.000"}
{"f1":6,"f2":"2022-02-28 21:00:06.000"}
{"f1":8,"f2":"2022-02-28 21:00:08.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}


??3

??????????????
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
??????????????????1??????????????????0-5??????????1??4??????5-10??????????7??5??????10-15??????????10??

????????????????



??????????
{"f1":10,"f2":"2022-02-28 21:00:10.000"}
??????10?????????? &gt;=5 ?? &gt;= 
10????????0-5??5-10??????????????0-5????????1??4??5-10????7??5??????10-15??????????10??

??????????????1??4??7??5
{"f1":1,"f2":"2022-02-28 21:00:01.000"}
{"f1":4,"f2":"2022-02-28 21:00:04.000"}
{"f1":7,"f2":"2022-02-28 21:00:07.000"}
{"f1":5,"f2":"2022-02-28 21:00:05.000"}

回复