hi: 感觉你的关注和回复
1> 下面是我的分析过程
1. 第一次是, 先在sql-client.sh 中执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)


此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入, 
并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的


2. 第二次是, 退出sql-client.sh后在执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和 
手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序
不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了


3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效)
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列, 作为watermark, 
有1分钟变为5分钟
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)
4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致
5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点



















在 2020-09-18 14:42:42,"chengyanan1...@foxmail.com" <chengyanan1...@foxmail.com> 
写道:
>先占个楼
>我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
>select  
>    tumble_start(rowtime, interval '2' MINUTE) as wStart,
>    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>    count(1) as pv,
>    count(distinct uuid) as uv 
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>最后得到的结果是这样的 :(跟题主不一样)
>
>                 wStart                      wEnd                        pv    
>                     uv
>          2020-09-18T09:14          2020-09-18T09:16                         2 
>                         2
>          2020-09-18T09:16          2020-09-18T09:18                         8 
>                         3
>          2020-09-18T09:18          2020-09-18T09:20                         8 
>                         3
>          2020-09-18T09:20          2020-09-18T09:22                         2 
>                         2
>
>等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
>wStart                                        wEnd                           
>pv                        uv
>2020-09-18T09:14          2020-09-18T09:16                  2                  
>       2
>2020-09-18T09:16          2020-09-18T09:18                  2                  
>       2
>2020-09-18T09:18          2020-09-18T09:20                  8                  
>       3
>2020-09-18T09:20          2020-09-18T09:22                  2                  
>       2
>
>
>
> 
>发件人: anonnius
>发送时间: 2020-09-18 11:24
>收件人: user-zh
>主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
>hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
>0> mac本地环境
>1> flink 1.11.1
>2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
>3> 使用的是sql-client.sh 环境
>4> 先在sql-cli中创建了iservVisit表
>create table iservVisit (
>    type string comment '时间类型',
>    uuid string comment '用户uri',
>    clientTime string comment '10位时间戳',
>    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
> '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
>    WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
>) with (
>    'connector' = 'kafka-0.10',
>    'topic' = 'message-json',
>    'properties.bootstrap.servers' = 'localhost:9092',
>    'properties.group.id' = 'consumer-rt',
>    'format' = 'json',
>    'json.ignore-parse-errors' = 'true',
>    'scan.startup.mode' = 'earliest-offset'
>)
>然后在sql-cli执行sql
>select  
>    tumble_start(rowtime, interval '2' MINUTE) as wStart,
>    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>    count(1) as pv,
>    count(distinct uuid) as uv 
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>5> 向kafka生产者依次发送下面的json消息
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 
>6> 第一次结果(这里sql-cli的sql一直在运行)
>    wStart                                      wEnd                        pv 
>                        uv
>2020-09-18T09:14          2020-09-18T09:16                  5                  
>       3
>2020-09-18T09:16          2020-09-18T09:18                  8                  
>       3
>2020-09-18T09:18          2020-09-18T09:20                  8                  
>       3
>2020-09-18T09:20          2020-09-18T09:22                  2                  
>       2
>7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
>wStart                                        wEnd                           
>pv                        uv
>2020-09-18T09:14          2020-09-18T09:16                  2                  
>       2
>2020-09-18T09:16          2020-09-18T09:18                  2                  
>       2
>2020-09-18T09:18          2020-09-18T09:20                  8                  
>       3
>2020-09-18T09:20          2020-09-18T09:22                  2                  
>       2
>8> 详细过程以放入附件文件中
>
>
>
>
>
> 





 
1. 第一次是, 先在sql-client.sh 中执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)

此时, 由于数据 是一条一条的通过kafka生产者工å…
·(kafka-console-producer.sh)写入, 
并且由kafka-connector会不停的消费数据, 
获取的数据是和手动写入的数据的顺序是一样的

            kafka记录                                                
clientTime消息时间      产生的watermark时间   说明
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}    2020-09-18 
09:14:44   2020-09-18 09:13:44   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}    2020-09-18 
09:14:23   2020-09-18 09:13:44   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}    2020-09-18 
09:14:50   2020-09-18 09:13:50   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}    2020-09-18 
09:15:09   2020-09-18 09:14:09   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}    2020-09-18 
09:15:48   2020-09-18 09:14:48   属于窗口 [2020-09-18 09:14:00 - 2020-09-18 
09:16:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}    2020-09-18 
09:16:22   2020-09-18 09:15:22   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}    2020-09-18 
09:16:21   2020-09-18 09:15:22   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)

-- 触发计算
                    wStart                      wEnd                        pv  
                      uv
          2020-09-18T09:14          2020-09-18T09:16                         5  
                       3
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}    2020-09-18 
09:17:03   2020-09-18 09:16:03   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
                                                                                
                               触发 结束时间早于 2020-09-18 09:16:00 
的窗口


{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}    2020-09-18 
09:17:02   2020-09-18 09:16:02   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}    2020-09-18 
09:16:55   2020-09-18 09:16:02   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}    2020-09-18 
09:17:37   2020-09-18 09:16:37   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}    2020-09-18 
09:17:50   2020-09-18 09:16:50   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}    2020-09-18 
09:17:31   2020-09-18 09:16:31   属于窗口 [2020-09-18 09:16:00 - 2020-09-18 
09:18:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}    2020-09-18 
09:18:23   2020-09-18 09:17:23   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}    2020-09-18 
09:18:09   2020-09-18 09:17:09   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)

-- 触发计算
                    wStart                      wEnd                        pv  
                      uv
          2020-09-18T09:16          2020-09-18T09:18                         8  
                       3
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}    2020-09-18 
09:19:05   2020-09-18 09:18:05   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
                                                                                
                               触发 结束时间早于 2020-09-18 09:18:00 
的窗口

{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}    2020-09-18 
09:18:58   2020-09-18 09:16:58   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}    2020-09-18 
09:19:11   2020-09-18 09:17:11   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}    2020-09-18 
09:18:56   2020-09-18 09:16:56   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}    2020-09-18 
09:19:30   2020-09-18 09:17:30   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}    2020-09-18 
09:20:16   2020-09-18 09:18:16   属于窗口 [2020-09-18 09:20:00 - 2020-09-18 
09:22:00)
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}    2020-09-18 
09:19:53   2020-09-18 09:17:53   属于窗口 [2020-09-18 09:18:00 - 2020-09-18 
09:20:00)
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}    2020-09-18 
09:20:57   2020-09-18 09:18:57   属于窗口 [2020-09-18 09:20:00 - 2020-09-18 
09:22:00)



2. 第二次是, 退出sql-client.sh后在执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)

这时由于数据已经写入kafka了, 
在由kafka-connector进行消费的时候, 由于topic有3个分区, 
消费后获取的消息的顺序和 手动通过kafka生产者工å…
·(kafka-console-producer.sh)写入时的顺序
不一致了, 这样rowtime时间靠后的数据可能先被消费, 
导致产生了比较大的watermark, 
导致后续消费的部分消息被忽略了

此时的明细数据(所有数据写入kafka后, 执行select * from 
iservVisit)
              type      uuid                clientTime            rowtime       
      watermark(手动)            所属窗口(手动)
        iservVisit         a                1600391663       2020-09-18 
09:14:23    2020-09-18 09:13:23 [2020-09-18 09:14:00 2020-09-18 09:16:00) W1
        iservVisit         b                1600391748       2020-09-18 
09:15:48    2020-09-18 09:14:48 [2020-09-18 09:14:00 2020-09-18 09:16:00) W1

        iservVisit         b                1600391823       2020-09-18 
09:17:03    2020-09-18 09:16:03 [2020-09-18 09:16:00 2020-09-18 09:18:00) W2
                                                                                
                                        触发 [2020-09-18 09:14:00 2020-09-18 
09:16:00) W1

        iservVisit         a                1600391857       2020-09-18 
09:17:37    2020-09-18 09:16:37 [2020-09-18 09:16:00 2020-09-18 09:18:00) W2
        iservVisit         c                1600391903       2020-09-18 
09:18:23    2020-09-18 09:17:23 [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
        iservVisit         b                1600391938       2020-09-18 
09:18:58    2020-09-18 09:17:58 [2020-09-18 09:18:00 2020-09-18 09:20:00) W3

        iservVisit         b                1600391970       2020-09-18 
09:19:30    2020-09-18 09:18:30 [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
                                                                                
                                        触发 [2020-09-18 09:16:00 2020-09-18 
09:18:00) W2

        iservVisit         a                1600392057       2020-09-18 
09:20:57    2020-09-18 09:19:57 [2020-09-18 09:20:00 2020-09-18 09:22:00) W4
        iservVisit         c                1600391684       2020-09-18 
09:14:44    -                   [2020-09-18 09:14:00 2020-09-18 09:16:00) 
忽略(窗口以触发)
        iservVisit         c                1600391709       2020-09-18 
09:15:09    -                   [2020-09-18 09:14:00 2020-09-18 09:16:00) 
忽略(窗口以触发)
        iservVisit         b                1600391781       2020-09-18 
09:16:21    -                   [2020-09-18 09:16:00 2020-09-18 09:18:00) 
忽略(窗口以触发)
        iservVisit         a                1600391815       2020-09-18 
09:16:55    -                   [2020-09-18 09:16:00 2020-09-18 09:18:00) 
忽略(窗口以触发)
        iservVisit         b                1600391851       2020-09-18 
09:17:31    -                   [2020-09-18 09:16:00 2020-09-18 09:18:00) 
忽略(窗口以触发)
        iservVisit         a                1600391945       2020-09-18 
09:19:05    -                   [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
        iservVisit         c                1600391936       2020-09-18 
09:18:56    -                   [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
        iservVisit         c                1600391993       2020-09-18 
09:19:53    -                   [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
        iservVisit         a                1600391690       2020-09-18 
09:14:50    -                   [2020-09-18 09:14:00 2020-09-18 09:16:00) 
忽略(窗口以触发)
        iservVisit         c                1600391782       2020-09-18 
09:16:22    -                   [2020-09-18 09:16:00 2020-09-18 09:18:00) 
忽略(窗口以触发)
        iservVisit         b                1600391822       2020-09-18 
09:17:02    -                   [2020-09-18 09:16:00 2020-09-18 09:18:00) 
忽略(窗口以触发)
        iservVisit         a                1600391870       2020-09-18 
09:17:50    -                   [2020-09-18 09:16:00 2020-09-18 09:18:00) 
忽略(窗口以触发)
        iservVisit         a                1600391889       2020-09-18 
09:18:09    -                   [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
        iservVisit         b                1600391951       2020-09-18 
09:19:11    -                   [2020-09-18 09:18:00 2020-09-18 09:20:00) W3
        iservVisit         c                1600392016       2020-09-18 
09:20:16    -                   [2020-09-18 09:20:00 2020-09-18 09:22:00) W4
        iservVisit         a                1800392057       2027-01-20 
04:54:17    2027-01-20 04:53:17 触发 [0000-00-00 00:00:00 2027-01-20 
04:53:17) W3 W4


3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 
这种方式还是考虑中(考虑是否一致有效)
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列, 
作为watermark, 有1分钟变为5分钟
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)

4. 现在的问题是如何保证/或通过什么办法, 
让每个分区的消费数据一致



回复