我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint ,源码CheckpointCoordinator#triggerCheckpoint也有说明
| | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年04月20日 10:37,Benchao Li 写道: 应该是不会的。分配不到partition的source会标记为idle状态。 Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > | | > Sun.Zhu > | > | > 邮箱:17626017...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年04月19日 22:43,人生若只如初见 写道: > 嗯嗯,十分感谢 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<libenc...@gmail.com>; > 发送时间: 2020年4月19日(星期天) 晚上9:25 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: Re: 问题请教-flinksql的kafkasource方面 > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > Jark Wu <imj...@gmail.com> 于2020年4月19日周日 下午8:22写道: > > > Hi, > > > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > 根据你的 Java 代码,数据的 event time > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > 能容忍 5s 乱序). > > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > partition > > 进度快很多的现象, > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > > > 完美的解决方案还需要等 FLIP-27 的完成。 > > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > > > Best, > > Jark > > > > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <cai...@qq.com> wrote: > > > > > 你好 > > > > > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > > > > > 附: > > > userbehavior建表语句 > > > CREATE TABLE user_behavior ( > > > &nbsp; &nbsp; user_id BIGINT, > > > &nbsp; &nbsp; item_id BIGINT, > > > &nbsp; &nbsp; category_id BIGINT, > > > &nbsp; &nbsp; behavior STRING, > > > &nbsp; &nbsp; ts TIMESTAMP(3), > > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- > 通过计算列产生一个处理时间列 > > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > SECOND &nbsp;-- > > > 在ts上定义watermark,ts成为事件时间列 > > > ) WITH ( > > > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- > 使用 kafka connector > > > &nbsp; &nbsp; 'connector.version' = 'universal', > &nbsp;-- kafka > > > 版本,universal 支持 0.11 以上的版本 > > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', > &nbsp;-- kafka topic > > > &nbsp; &nbsp; 'connector.startup-mode' = > 'earliest-offset', &nbsp;-- 从起始 > > > offset 开始读取 > > > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' = > ' > > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = > ' > > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- 数据源格式为 > json > > > ) > > > > > > 每小时购买数建表语句 > > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > > &nbsp; &nbsp; hour_of_day BIGINT, > > > &nbsp; &nbsp; buy_cnt BIGINT > > > ) WITH ( > > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 > elasticsearch > > > connector > > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- > elasticsearch 版本,6 能支持 > > > es 6+ 以及 7+ 的版本 > > > &nbsp; &nbsp; 'connector.hosts' = ' > http://192.168.0.150:9200', &nbsp;-- > > > elasticsearch 地址 > > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', > &nbsp;-- > > > elasticsearch 索引名,相当于数据库的表名 > > > &nbsp; &nbsp; 'connector.document-type' = > 'user_behavior', -- > > > elasticsearch 的 type,相当于数据库的库名 > > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', > &nbsp;-- 每条数据都刷新 > > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- > 输出数据格式 json > > > &nbsp; &nbsp; 'update-mode' = 'append' > > > ) > > > > > > 插入语句 > > > INSERT INTO buy_cnt_per_hour&nbsp; > > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > HOUR)),COUNT(*)&nbsp; > > > FROM user_behavior > > > WHERE behavior = 'buy' > > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > > > kafka数据发送代码 > > > > > > import com.alibaba.fastjson.JSONObject; > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > > > import java.text.SimpleDateFormat; > > > import java.util.*; > > > > > > > > > public class UserBehaviorProducer { > > > public static final String brokerList = " > 192.168.0.150:9092"; > > > > > > // public static final > String topic="user_behavior"; > > > public static final String topic = > "user_behavior"; > > > > > > public static void main(String args[]) { > > > > > > //配置生产者客户端参数 > > > //将配置序列化 > > > Properties > properties = new Properties(); > > > > properties.put("key.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("value.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("bootstrap.servers", brokerList); > > > > //创建KafkaProducer 实例 > > > > KafkaProducer<String, String&gt; producer = new > > > KafkaProducer<&gt;(properties); > > > //构建待发送的消息 > > > //{"user_id": > "952483", "item_id":"310884", "category_id": > > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > //{"user_id": > "794777", "item_id":"5119439", "category_id": > > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > String[] > behaviors = {"pv", "buy", "coll", > > "cart"};//浏览,购买,收藏,加入购物车 > > > JSONObject > jsonObject = new JSONObject(); > > > HashMap<String, > String&gt; info = new HashMap<&gt;(); > > > Random random = > new Random(); > > > SimpleDateFormat > format = new > > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > > long > date_long=getDate(); > > > while (true) { > > > > > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > > ""); > > > > > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > > ""); > > > > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > > > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > > > > jsonObject.put("ts", format.format(new Date(date_long))); > > > > > String msg = jsonObject.toString(); > > > > > System.out.println(msg); > > > > > ProducerRecord<String, String&gt; record = new > > > ProducerRecord<&gt;(topic, msg); > > > > > producer.send(record); > > > > // > date_long +=500+random.nextGaussian()*1000; > > > > > date_long +=800+random.nextGaussian()*1500; > > > > > try { > > > > > Thread.sleep(60); > > > > > } catch (InterruptedException e) { > > > > > e.printStackTrace(); > > > > > } > > > } > > > > > > } > > > > > > private static long getDate() { > > > Date date = new > Date(); > > > Calendar c = > Calendar.getInstance(); > > > c.setTime(date); > > > > //设置为1号,当前日期既为本月第一天 > > > > c.set(Calendar.DAY_OF_MONTH, 1); > > > //将小时至0 > > > > c.set(Calendar.HOUR_OF_DAY, 0); > > > //将分钟至0 > > > > c.set(Calendar.MINUTE, 0); > > > //将秒至0 > > > > c.set(Calendar.SECOND,0); > > > //将毫秒至0 > > > > c.set(Calendar.MILLISECOND, 0); > > > // > 本月第一天的时间戳转换为字符串 > > > return > c.getTimeInMillis(); > > > } > > > } > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > > 发件人:&nbsp;"Jark Wu"<imj...@gmail.com&gt;; > > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;; > > > > > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > > > > > Hi, > > > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, > TableEnvironment 上还未支持。 > > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > > > > > Best, > > > Jark > > > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <cai...@qq.com&gt; > wrote: > > > > > > &gt; 大佬好: > > > &gt; &amp;nbsp; &amp;nbsp; > > > &gt; > > > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > > &gt; &amp;nbsp; &amp;nbsp; > > > &gt; > > > > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > > &gt; &amp;nbsp;&amp;nbsp; > > > &gt; &amp;nbsp; &amp;nbsp; > > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > > &gt; view as ...")却会报错。报错如下: > > > &gt; Exception in thread "main" > > org.apache.flink.table.api.TableException: > > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS > > > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp; > > > &gt; &amp;nbsp; CASE C.parent_category_id > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > > &gt; &amp;nbsp; END AS category_name > > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR > SYSTEM_TIME AS OF > > > &gt; U.proctime AS C > > > &gt; ON U.category_id = C.sub_category_id > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > > &gt; Source) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > java.util.Optional.orElseThrow(Optional.java:290) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > &gt; > > > &gt; > > > &gt; > > > &gt; > > > &gt; > > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; > &amp;nbsp; 望解答,十分感谢! > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn