我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。

祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:

> 我们是1.8版本,但是这段源码应该是没变把
> // check if all tasks that we need to trigger are running.
> // if not, abort the checkpoint
> Execution[] executions = new Execution[tasksToTrigger.length];
> for (int i = 0; i < tasksToTrigger.length; i++) {
>    Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
>    if (ee == null) {
>       LOG.info("Checkpoint triggering task {} of job {} is not being
> executed at the moment. Aborting checkpoint.",
>             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>             job);
>       throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>    } else if (ee.getState() == ExecutionState.RUNNING) {
>       executions[i] = ee;
>    } else {
>       LOG.info("Checkpoint triggering task {} of job {} is not in state {}
> but {} instead. Aborting checkpoint.",
>             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>             job,
>             ExecutionState.RUNNING,
>             ee.getState());
>       throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>    }
> }
> 还是我理解的不对
>
> > 2020年4月20日 下午6:21,Benchao Li <libenc...@gmail.com> 写道:
> >
> > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> >
> > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> >
> >>
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> >>
> >>
> >>
> >>
> >>
> >> | |
> >> Sun.Zhu
> >> |
> >> |
> >> 邮箱:17626017...@163.com
> >> |
> >>
> >> Signature is customized by Netease Mail Master
> >>
> >> 在2020年04月20日 10:37,Benchao Li 写道:
> >> 应该是不会的。分配不到partition的source会标记为idle状态。
> >>
> >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> >>
> >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> >>>
> >>>
> >>>
> >>>
> >>> | |
> >>> Sun.Zhu
> >>> |
> >>> |
> >>> 邮箱:17626017...@163.com
> >>> |
> >>>
> >>> Signature is customized by Netease Mail Master
> >>>
> >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> >>> 嗯嗯,十分感谢
> >>>
> >>>
> >>>
> >>>
> >>> ------------------&nbsp;原始邮件&nbsp;------------------
> >>> 发件人:&nbsp;"Benchao Li"<libenc...@gmail.com&gt;;
> >>> 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
> >>> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> >>>
> >>> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
> >>>
> >>>
> >>>
> >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> >>>
> >>> Jark Wu <imj...@gmail.com&gt; 于2020年4月19日周日 下午8:22写道:
> >>>
> >>> &gt; Hi,
> >>> &gt;
> >>> &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> >>> &gt; 根据你的 Java 代码,数据的 event time
> >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> >>> &gt; 能容忍 5s 乱序).
> >>> &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> >>> partition
> >>> &gt; 进度快很多的现象,
> >>> &gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> >>> &gt;
> >>> &gt; 完美的解决方案还需要等 FLIP-27 的完成。
> >>> &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> >>> &gt;
> >>> &gt; Best,
> >>> &gt; Jark
> >>> &gt;
> >>> &gt;
> >>> &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <cai...@qq.com&gt; wrote:
> >>> &gt;
> >>> &gt; &gt; 你好
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt; 附:
> >>> &gt; &gt; userbehavior建表语句
> >>> &gt; &gt; CREATE TABLE user_behavior (
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
> >>> 通过计算列产生一个处理时间列
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> >>> SECOND &amp;nbsp;--
> >>> &gt; &gt; 在ts上定义watermark,ts成为事件时间列
> >>> &gt; &gt; ) WITH (
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka',
> &amp;nbsp;--
> >>> 使用 kafka connector
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
> >>> &amp;nbsp;-- kafka
> >>> &gt; &gt; 版本,universal 支持 0.11 以上的版本
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
> >>> &amp;nbsp;-- kafka topic
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
> >>> 'earliest-offset', &amp;nbsp;-- 从起始
> >>> &gt; &gt; offset 开始读取
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> 'connector.properties.zookeeper.connect'
> >> =
> >>> '
> >>> &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> 'connector.properties.bootstrap.servers'
> >> =
> >>> '
> >>> &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;--
> >> 数据源格式为
> >>> json
> >>> &gt; &gt; )
> >>> &gt; &gt;
> >>> &gt; &gt; 每小时购买数建表语句
> >>> &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
> >>> &gt; &gt; ) WITH (
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', --
> 使用
> >>> elasticsearch
> >>> &gt; &gt; connector
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;--
> >>> elasticsearch 版本,6 能支持
> >>> &gt; &gt; es 6+ 以及 7+ 的版本
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
> >>> http://192.168.0.150:9200', &amp;nbsp;--
> >>> &gt; &gt; elasticsearch 地址
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour',
> >>> &amp;nbsp;--
> >>> &gt; &gt; elasticsearch 索引名,相当于数据库的表名
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
> >>> 'user_behavior', --
> >>> &gt; &gt; elasticsearch 的 type,相当于数据库的库名
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' =
> '1',
> >>> &amp;nbsp;-- 每条数据都刷新
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
> >>> 输出数据格式 json
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
> >>> &gt; &gt; )
> >>> &gt; &gt;
> >>> &gt; &gt; 插入语句
> >>> &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
> >>> &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> >>> HOUR)),COUNT(*)&amp;nbsp;
> >>> &gt; &gt; FROM user_behavior
> >>> &gt; &gt; WHERE behavior = 'buy'
> >>> &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> >>> &gt; &gt;
> >>> &gt; &gt; kafka数据发送代码
> >>> &gt; &gt;
> >>> &gt; &gt; import com.alibaba.fastjson.JSONObject;
> >>> &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
> >>> &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
> >>> &gt; &gt;
> >>> &gt; &gt; import java.text.SimpleDateFormat;
> >>> &gt; &gt; import java.util.*;
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt; public class UserBehaviorProducer {
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList
> >> = "
> >>> 192.168.0.150:9092";
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static
> >> final
> >>> String topic="user_behavior";
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
> >>> "user_behavior";
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String
> args[])
> >> {
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
> >>> properties = new Properties();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> properties.put("key.serializer",
> >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> properties.put("value.serializer",
> >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> properties.put("bootstrap.servers", brokerList);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> //创建KafkaProducer 实例
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> KafkaProducer<String, String&amp;gt; producer = new
> >>> &gt; &gt; KafkaProducer<&amp;gt;(properties);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> >>> "952483", "item_id":"310884", "category_id":
> >>> &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> >>> "794777", "item_id":"5119439", "category_id":
> >>> &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
> >>> behaviors = {"pv", "buy", "coll",
> >>> &gt; "cart"};//浏览,购买,收藏,加入购物车
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
> >>> jsonObject = new JSONObject();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> HashMap<String,
> >>> String&amp;gt; info = new HashMap<&amp;gt;();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random
> random =
> >>> new Random();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> SimpleDateFormat
> >>> format = new
> >>> &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
> >>> date_long=getDate();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true)
> {
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> >>> &gt; &gt; "");
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> >>> &gt; &gt; "");
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("category_id", random.nextInt(1000) + "");
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("ts", format.format(new Date(date_long)));
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> String msg = jsonObject.toString();
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> System.out.println(msg);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> ProducerRecord<String, String&amp;gt; record = new
> >>> &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> producer.send(record);
> >>> &gt; &gt;
> >>> //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> date_long +=500+random.nextGaussian()*1000;
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> date_long +=800+random.nextGaussian()*1500;
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> try {
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> Thread.sleep(60);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> } catch (InterruptedException e) {
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> e.printStackTrace();
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> }
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date =
> new
> >>> Date();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c =
> >>> Calendar.getInstance();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> c.setTime(date);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> //设置为1号,当前日期既为本月第一天
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.DAY_OF_MONTH, 1);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.HOUR_OF_DAY, 0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.MINUTE, 0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.SECOND,0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.MILLISECOND, 0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> >>> 本月第一天的时间戳转换为字符串
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> >>> c.getTimeInMillis();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> >>> &gt; &gt; }
> >>> &gt; &gt;
> >>> &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> >>> &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<imj...@gmail.com&amp;gt;;
> >>> &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
> >>> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> >>> &gt; &gt;
> >>> &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt; Hi,
> >>> &gt; &gt;
> >>> &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
> >>> TableEnvironment 上还未支持。
> >>> &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了"
> >> 是什么意思呢?要不举个例子看下?
> >>> &gt; &gt;
> >>> &gt; &gt; Best,
> >>> &gt; &gt; Jark
> >>> &gt; &gt;
> >>> &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <cai...@qq.com&amp;gt;
> >>> wrote:
> >>> &gt; &gt;
> >>> &gt; &gt; &amp;gt; 大佬好:
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt;
> >>> &gt;
> >>>
> >>
> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> >>> &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
> >>> &gt; &gt; &amp;gt; Exception in thread "main"
> >>> &gt; org.apache.flink.table.api.TableException:
> >>> &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
> >>> &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id,
> U.behavior,&amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
> >>> &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
> >>> SYSTEM_TIME AS OF
> >>> &gt; &gt; &amp;gt; U.proctime AS C
> >>> &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> >>> &gt; &gt; &amp;gt; Source)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &amp;amp;nbsp; 望解答,十分感谢!
> >>> &gt;
> >>>
> >>>
> >>> --
> >>>
> >>> Benchao Li
> >>> School of Electronics Engineering and Computer Science, Peking
> University
> >>> Tel:+86-15650713730
> >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >>
> >>
> >>
> >> --
> >>
> >> Benchao Li
> >> School of Electronics Engineering and Computer Science, Peking
> University
> >> Tel:+86-15650713730
> >> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >>
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

回复