回复:问题请教-flinksql的kafkasource方面

2020-04-20 文章 Sun.Zhu
嗯是的,都设置成小于等于partition数




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月21日 00:28,Jark Wu 写道:
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li  wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >if (ee == null) {
> >   LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job);
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >} else if (ee.getState() == ExecutionState.RUNNING) {
> >   executions[i] = ee;
> >} else {
> >   LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job,
> > ExecutionState.RUNNING,
> > ee.getState());
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >}
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li  写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:17626017...@163.com
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:17626017...@163.com
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> -- 原始邮件 --
> > >>> 发件人: "Benchao Li" > >>> 发送时间: 2020年4月19日(星期天) 晚上9:25
> > >>> 收件人: "user-zh" > >>>
> > >>> 主题: Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu  > >>>
> > >>> > Hi,
> > >>> >
> > >>> > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>> > 根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>> > 能容忍 5s 乱序).
> > >>> > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>> > 进度快很多的现象,
> > >>> >
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> >
> > >>> > 完美的解决方案还需要等 FLIP-27 的完成。
> > >>> > 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> >
> > >>> > Best,
> > >>> > Jark
> > >>> >
> > >>> >
> > >>> > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > >>> >
> > >>> > > 你好
> > >>> > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > 附:
> > >>> > > userbehavior建表语句
> > >>> > > CREATE TABLE user_behavior (
> > >>> > >     user_id BIGINT,
> > >>> > >     item_id BIGINT,
> > >>> > >     category_id BIGINT,
> > >>> > >     behavior STRING,
> > >>> > >     ts TIMESTAMP(3),
> > >>> > >     proctime as PROCTIME(),   --
> > >>> 通过计算列产生一个处理时间列
> > >>> > >     WATERMARK FOR ts as ts - INTERVAL '5'
> > >>> SECOND  --
> > >>> > > 在ts上定义watermark,ts成为事件时间列
> > >>> > > ) WITH (
> > >>> > >     'connector.type' = 'kafka',
> >  --
> > >>> 使用 kafka connector
> > >>> > >     'connector.version' = 'universal',
> > >>>  -- kafka
> > >>> > > 版本,universal 支持 0.11 以上的版本
> > >>> > >     'connector.topic' = 'user_behavior',
> > >>>  -- kafka topic
> > >>> > >     'connector.startup-mode' =
> > >>> 'earliest-offset',  -- 从起始
> > >>> > > offset 开始读取
> > >>> > >    
> > 'connector.properties.zookeeper.connect'
> > >> =
> > >>> '
> > >>> > > 192.168.0.150:2181',  -- zookeeper 地址
> > >>> > >    
> > 'connector.properties.bootstrap.servers'
> > >> =
> > >>> '
> > >>> > > 192.168.0.150:9092',  -- kafka broker 地址
> > >>> > >     'format.type' = 'json'  --
> > >>

回复:问题请教-flinksql的kafkasource方面

2020-04-20 文章 Sun.Zhu
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
 ,源码CheckpointCoordinator#triggerCheckpoint也有说明





| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月20日 10:37,Benchao Li 写道:
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> -- 原始邮件 --
>  发件人: "Benchao Li" 发送时间: 2020年4月19日(星期天) 晚上9:25
> 收件人: "user-zh"
> 主题: Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu 
> > Hi,
> >
> > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > 根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > 能容忍 5s 乱序).
> > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
> > 进度快很多的现象,
> > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> >
> > 完美的解决方案还需要等 FLIP-27 的完成。
> > 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> >
> > Best,
> > Jark
> >
> >
> > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  >
> > > 你好
> > >
> > >
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > >
> > >
> > >
> > > 附:
> > > userbehavior建表语句
> > > CREATE TABLE user_behavior (
> > >     user_id BIGINT,
> > >     item_id BIGINT,
> > >     category_id BIGINT,
> > >     behavior STRING,
> > >     ts TIMESTAMP(3),
> > >     proctime as PROCTIME(),   --
> 通过计算列产生一个处理时间列
> > >     WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND  --
> > > 在ts上定义watermark,ts成为事件时间列
> > > ) WITH (
> > >     'connector.type' = 'kafka',  --
> 使用 kafka connector
> > >     'connector.version' = 'universal',
>  -- kafka
> > > 版本,universal 支持 0.11 以上的版本
> > >     'connector.topic' = 'user_behavior',
>  -- kafka topic
> > >     'connector.startup-mode' =
> 'earliest-offset',  -- 从起始
> > > offset 开始读取
> > >     'connector.properties.zookeeper.connect' =
> '
> > > 192.168.0.150:2181',  -- zookeeper 地址
> > >     'connector.properties.bootstrap.servers' =
> '
> > > 192.168.0.150:9092',  -- kafka broker 地址
> > >     'format.type' = 'json'  -- 数据源格式为
> json
> > > )
> > >
> > > 每小时购买数建表语句
> > > CREATE TABLE buy_cnt_per_hour ( 
> > >     hour_of_day BIGINT,
> > >     buy_cnt BIGINT
> > > ) WITH (
> > >     'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
> > > connector
> > >     'connector.version' = '6',  --
> elasticsearch 版本,6 能支持
> > > es 6+ 以及 7+ 的版本
> > >     'connector.hosts' = '
> http://192.168.0.150:9200',  --
> > > elasticsearch 地址
> > >     'connector.index' = 'buy_cnt_per_hour',
>  --
> > > elasticsearch 索引名,相当于数据库的表名
> > >     'connector.document-type' =
> 'user_behavior', --
> > > elasticsearch 的 type,相当于数据库的库名
> > >     'connector.bulk-flush.max-actions' = '1',
>  -- 每条数据都刷新
> > >     'format.type' = 'json',  --
> 输出数据格式 json
> > >     'update-mode' = 'append'
> > > )
> > >
> > > 插入语句
> > > INSERT INTO buy_cnt_per_hour 
> > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*) 
> > > FROM user_behavior
> > > WHERE behavior = 'buy'
> > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> > >
> > > kafka数据发送代码
> > >
> > > import com.alibaba.fastjson.JSONObject;
> > > import org.apache.kafka.clients.producer.KafkaProducer;
> > > import org.apache.kafka.clients.producer.ProducerRecord;
> > >
> > > import java.text.SimpleDateFormat;
> > > import java.util.*;
> > >
> > >
> > > public class UserBehaviorProducer {
> > > public static final String brokerList = "
> 192.168.0.150:9092";
> > >
> > > //    public static final
> String topic="user_behavior";
> > > public static final String topic =
> "user_behavior";
> > >
> > > public static void main(String args[]) {
> > >
> > > //配置生产者客户端参数
> > > //将配置序列化
> > > Properties
> properties = new Properties();
> > >
> properties.put("key.serializer",
> > > "org.apache.kafka.common.serialization.StringSerializer");
> > >
> properties.put("value.serializer",
> > > "org.apache.kafka.common.serialization.StringSerializer");
> > >
> properties.put("bootstrap.servers", brokerList);
> > >
> //创建KafkaProducer 实例
> > >
> KafkaProducer > > KafkaProducer<>(properties);
> > > //构建待发送的消息
> > > //{"user_id":
> "952483", "item_id":"310884", "category_id":
> > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > > //{"user_id":
> "794777", "item_id":"5119439", "category_id":