回复:问题请教-flinksql的kafkasource方面
嗯是的,都设置成小于等于partition数 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年04月21日 00:28,Jark Wu 写道: Hi, 你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。 Best, Jark On Mon, 20 Apr 2020 at 22:33, Benchao Li wrote: > 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。 > > 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道: > > > 我们是1.8版本,但是这段源码应该是没变把 > > // check if all tasks that we need to trigger are running. > > // if not, abort the checkpoint > > Execution[] executions = new Execution[tasksToTrigger.length]; > > for (int i = 0; i < tasksToTrigger.length; i++) { > >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); > >if (ee == null) { > > LOG.info("Checkpoint triggering task {} of job {} is not being > > executed at the moment. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} else if (ee.getState() == ExecutionState.RUNNING) { > > executions[i] = ee; > >} else { > > LOG.info("Checkpoint triggering task {} of job {} is not in state > {} > > but {} instead. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job, > > ExecutionState.RUNNING, > > ee.getState()); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} > > } > > 还是我理解的不对 > > > > > 2020年4月20日 下午6:21,Benchao Li 写道: > > > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道: > > > > > >> > > > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > >> > > >> > > >> > > >> > > >> > > >> | | > > >> Sun.Zhu > > >> | > > >> | > > >> 邮箱:17626017...@163.com > > >> | > > >> > > >> Signature is customized by Netease Mail Master > > >> > > >> 在2020年04月20日 10:37,Benchao Li 写道: > > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > > >> > > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > > >> > > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > >>> > > >>> > > >>> > > >>> > > >>> | | > > >>> Sun.Zhu > > >>> | > > >>> | > > >>> 邮箱:17626017...@163.com > > >>> | > > >>> > > >>> Signature is customized by Netease Mail Master > > >>> > > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > > >>> 嗯嗯,十分感谢 > > >>> > > >>> > > >>> > > >>> > > >>> -- 原始邮件 -- > > >>> 发件人: "Benchao Li" > >>> 发送时间: 2020年4月19日(星期天) 晚上9:25 > > >>> 收件人: "user-zh" > >>> > > >>> 主题: Re: 问题请教-flinksql的kafkasource方面 > > >>> > > >>> > > >>> > > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > >>> > > >>> Jark Wu > >>> > > >>> > Hi, > > >>> > > > >>> > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > >>> > 根据你的 Java 代码,数据的 event time > > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > >>> > 能容忍 5s 乱序). > > >>> > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition > 进度比某些 > > >>> partition > > >>> > 进度快很多的现象, > > >>> > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > >>> > > > >>> > 完美的解决方案还需要等 FLIP-27 的完成。 > > >>> > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > >>> > > > >>> > Best, > > >>> > Jark > > >>> > > > >>> > > > >>> > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > >>> > > > >>> > > 你好 > > >>> > > > > >>> > > > > >>> > > > >>> > > >> > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > >>> > > > > >>> > > > > >>> > > > > >>> > > 附: > > >>> > > userbehavior建表语句 > > >>> > > CREATE TABLE user_behavior ( > > >>> > > user_id BIGINT, > > >>> > > item_id BIGINT, > > >>> > > category_id BIGINT, > > >>> > > behavior STRING, > > >>> > > ts TIMESTAMP(3), > > >>> > > proctime as PROCTIME(), -- > > >>> 通过计算列产生一个处理时间列 > > >>> > > WATERMARK FOR ts as ts - INTERVAL '5' > > >>> SECOND -- > > >>> > > 在ts上定义watermark,ts成为事件时间列 > > >>> > > ) WITH ( > > >>> > > 'connector.type' = 'kafka', > > -- > > >>> 使用 kafka connector > > >>> > > 'connector.version' = 'universal', > > >>> -- kafka > > >>> > > 版本,universal 支持 0.11 以上的版本 > > >>> > > 'connector.topic' = 'user_behavior', > > >>> -- kafka topic > > >>> > > 'connector.startup-mode' = > > >>> 'earliest-offset', -- 从起始 > > >>> > > offset 开始读取 > > >>> > > > > 'connector.properties.zookeeper.connect' > > >> = > > >>> ' > > >>> > > 192.168.0.150:2181', -- zookeeper 地址 > > >>> > > > > 'connector.properties.bootstrap.servers' > > >> = > > >>> ' > > >>> > > 192.168.0.150:9092', -- kafka broker 地址 > > >>> > > 'format.type' = 'json' -- > > >>
回复:问题请教-flinksql的kafkasource方面
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint ,源码CheckpointCoordinator#triggerCheckpoint也有说明 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年04月20日 10:37,Benchao Li 写道: 应该是不会的。分配不到partition的source会标记为idle状态。 Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > | | > Sun.Zhu > | > | > 邮箱:17626017...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年04月19日 22:43,人生若只如初见 写道: > 嗯嗯,十分感谢 > > > > > -- 原始邮件 -- > 发件人: "Benchao Li" 发送时间: 2020年4月19日(星期天) 晚上9:25 > 收件人: "user-zh" > 主题: Re: 问题请教-flinksql的kafkasource方面 > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > Jark Wu > > Hi, > > > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > 根据你的 Java 代码,数据的 event time > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > 能容忍 5s 乱序). > > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > partition > > 进度快很多的现象, > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > > > 完美的解决方案还需要等 FLIP-27 的完成。 > > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > > > Best, > > Jark > > > > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > > > > 你好 > > > > > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > > > > > 附: > > > userbehavior建表语句 > > > CREATE TABLE user_behavior ( > > > user_id BIGINT, > > > item_id BIGINT, > > > category_id BIGINT, > > > behavior STRING, > > > ts TIMESTAMP(3), > > > proctime as PROCTIME(), -- > 通过计算列产生一个处理时间列 > > > WATERMARK FOR ts as ts - INTERVAL '5' > SECOND -- > > > 在ts上定义watermark,ts成为事件时间列 > > > ) WITH ( > > > 'connector.type' = 'kafka', -- > 使用 kafka connector > > > 'connector.version' = 'universal', > -- kafka > > > 版本,universal 支持 0.11 以上的版本 > > > 'connector.topic' = 'user_behavior', > -- kafka topic > > > 'connector.startup-mode' = > 'earliest-offset', -- 从起始 > > > offset 开始读取 > > > 'connector.properties.zookeeper.connect' = > ' > > > 192.168.0.150:2181', -- zookeeper 地址 > > > 'connector.properties.bootstrap.servers' = > ' > > > 192.168.0.150:9092', -- kafka broker 地址 > > > 'format.type' = 'json' -- 数据源格式为 > json > > > ) > > > > > > 每小时购买数建表语句 > > > CREATE TABLE buy_cnt_per_hour ( > > > hour_of_day BIGINT, > > > buy_cnt BIGINT > > > ) WITH ( > > > 'connector.type' = 'elasticsearch', -- 使用 > elasticsearch > > > connector > > > 'connector.version' = '6', -- > elasticsearch 版本,6 能支持 > > > es 6+ 以及 7+ 的版本 > > > 'connector.hosts' = ' > http://192.168.0.150:9200', -- > > > elasticsearch 地址 > > > 'connector.index' = 'buy_cnt_per_hour', > -- > > > elasticsearch 索引名,相当于数据库的表名 > > > 'connector.document-type' = > 'user_behavior', -- > > > elasticsearch 的 type,相当于数据库的库名 > > > 'connector.bulk-flush.max-actions' = '1', > -- 每条数据都刷新 > > > 'format.type' = 'json', -- > 输出数据格式 json > > > 'update-mode' = 'append' > > > ) > > > > > > 插入语句 > > > INSERT INTO buy_cnt_per_hour > > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > HOUR)),COUNT(*) > > > FROM user_behavior > > > WHERE behavior = 'buy' > > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > > > kafka数据发送代码 > > > > > > import com.alibaba.fastjson.JSONObject; > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > > > import java.text.SimpleDateFormat; > > > import java.util.*; > > > > > > > > > public class UserBehaviorProducer { > > > public static final String brokerList = " > 192.168.0.150:9092"; > > > > > > // public static final > String topic="user_behavior"; > > > public static final String topic = > "user_behavior"; > > > > > > public static void main(String args[]) { > > > > > > //配置生产者客户端参数 > > > //将配置序列化 > > > Properties > properties = new Properties(); > > > > properties.put("key.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("value.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("bootstrap.servers", brokerList); > > > > //创建KafkaProducer 实例 > > > > KafkaProducer > > KafkaProducer<>(properties); > > > //构建待发送的消息 > > > //{"user_id": > "952483", "item_id":"310884", "category_id": > > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > //{"user_id": > "794777", "item_id":"5119439", "category_id":