回复:问题请教-flinksql的kafkasource方面
嗯是的,都设置成小于等于partition数 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年04月21日 00:28,Jark Wu 写道: Hi, 你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。 Best, Jark On Mon, 20 Apr 2020 at 22:33, Benchao Li wrote: > 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。 > > 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道: > > > 我们是1.8版本,但是这段源码应该是没变把 > > // check if all tasks that we need to trigger are running. > > // if not, abort the checkpoint > > Execution[] executions = new Execution[tasksToTrigger.length]; > > for (int i = 0; i < tasksToTrigger.length; i++) { > >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); > >if (ee == null) { > > LOG.info("Checkpoint triggering task {} of job {} is not being > > executed at the moment. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} else if (ee.getState() == ExecutionState.RUNNING) { > > executions[i] = ee; > >} else { > > LOG.info("Checkpoint triggering task {} of job {} is not in state > {} > > but {} instead. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job, > > ExecutionState.RUNNING, > > ee.getState()); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} > > } > > 还是我理解的不对 > > > > > 2020年4月20日 下午6:21,Benchao Li 写道: > > > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道: > > > > > >> > > > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > >> > > >> > > >> > > >> > > >> > > >> | | > > >> Sun.Zhu > > >> | > > >> | > > >> 邮箱:17626017...@163.com > > >> | > > >> > > >> Signature is customized by Netease Mail Master > > >> > > >> 在2020年04月20日 10:37,Benchao Li 写道: > > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > > >> > > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > > >> > > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > >>> > > >>> > > >>> > > >>> > > >>> | | > > >>> Sun.Zhu > > >>> | > > >>> | > > >>> 邮箱:17626017...@163.com > > >>> | > > >>> > > >>> Signature is customized by Netease Mail Master > > >>> > > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > > >>> 嗯嗯,十分感谢 > > >>> > > >>> > > >>> > > >>> > > >>> --原始邮件-- > > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25 > > >>> 收件人:"user-zh" > >>> > > >>> 主题:Re: 问题请教-flinksql的kafkasource方面 > > >>> > > >>> > > >>> > > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > >>> > > >>> Jark Wu > >>> > > >>> Hi, > > >>> > > >>> 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > >>> 根据你的 Java 代码,数据的 event time > > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > >>> 能容忍 5s 乱序). > > >>> 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition > 进度比某些 > > >>> partition > > >>> 进度快很多的现象, > > >>> > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > >>> > > >>> 完美的解决方案还需要等 FLIP-27 的完成。 > > >>> 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > >>> > > >>> Best, > > >>> Jark > > >>> > > >>> > > >>> On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > >>> > > >>> 你好 > > >>> > > >>> > > >>> > > >>> > > >> > > > 感谢解答,第一个问题就是当我用插入语句时,如果
Re: 问题请教-flinksql的kafkasource方面
Hi, 你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。 Best, Jark On Mon, 20 Apr 2020 at 22:33, Benchao Li wrote: > 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。 > > 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道: > > > 我们是1.8版本,但是这段源码应该是没变把 > > // check if all tasks that we need to trigger are running. > > // if not, abort the checkpoint > > Execution[] executions = new Execution[tasksToTrigger.length]; > > for (int i = 0; i < tasksToTrigger.length; i++) { > >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); > >if (ee == null) { > > LOG.info("Checkpoint triggering task {} of job {} is not being > > executed at the moment. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} else if (ee.getState() == ExecutionState.RUNNING) { > > executions[i] = ee; > >} else { > > LOG.info("Checkpoint triggering task {} of job {} is not in state > {} > > but {} instead. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job, > > ExecutionState.RUNNING, > > ee.getState()); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} > > } > > 还是我理解的不对 > > > > > 2020年4月20日 下午6:21,Benchao Li 写道: > > > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道: > > > > > >> > > > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > >> > > >> > > >> > > >> > > >> > > >> | | > > >> Sun.Zhu > > >> | > > >> | > > >> 邮箱:17626017...@163.com > > >> | > > >> > > >> Signature is customized by Netease Mail Master > > >> > > >> 在2020年04月20日 10:37,Benchao Li 写道: > > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > > >> > > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > > >> > > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > >>> > > >>> > > >>> > > >>> > > >>> | | > > >>> Sun.Zhu > > >>> | > > >>> | > > >>> 邮箱:17626017...@163.com > > >>> | > > >>> > > >>> Signature is customized by Netease Mail Master > > >>> > > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > > >>> 嗯嗯,十分感谢 > > >>> > > >>> > > >>> > > >>> > > >>> --原始邮件-- > > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25 > > >>> 收件人:"user-zh" > >>> > > >>> 主题:Re: 问题请教-flinksql的kafkasource方面 > > >>> > > >>> > > >>> > > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > >>> > > >>> Jark Wu > >>> > > >>> Hi, > > >>> > > >>> 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > >>> 根据你的 Java 代码,数据的 event time > > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > >>> 能容忍 5s 乱序). > > >>> 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition > 进度比某些 > > >>> partition > > >>> 进度快很多的现象, > > >>> > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > >>> > > >>> 完美的解决方案还需要等 FLIP-27 的完成。 > > >>> 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > >>> > > >>> Best, > > >>> Jark > > >>> > > >>> > > >>> On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > >>> > > >>> 你好 > > >>> > > >>> > > >>> > > >>> > > >> > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > >>> > > >>> > > >>>
Re: 问题请教-flinksql的kafkasource方面
我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道: > 我们是1.8版本,但是这段源码应该是没变把 > // check if all tasks that we need to trigger are running. > // if not, abort the checkpoint > Execution[] executions = new Execution[tasksToTrigger.length]; > for (int i = 0; i < tasksToTrigger.length; i++) { >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); >if (ee == null) { > LOG.info("Checkpoint triggering task {} of job {} is not being > executed at the moment. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); >} else if (ee.getState() == ExecutionState.RUNNING) { > executions[i] = ee; >} else { > LOG.info("Checkpoint triggering task {} of job {} is not in state {} > but {} instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); >} > } > 还是我理解的不对 > > > 2020年4月20日 下午6:21,Benchao Li 写道: > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道: > > > >> > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > >> > >> > >> > >> > >> > >> | | > >> Sun.Zhu > >> | > >> | > >> 邮箱:17626017...@163.com > >> | > >> > >> Signature is customized by Netease Mail Master > >> > >> 在2020年04月20日 10:37,Benchao Li 写道: > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > >> > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > >> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > >>> > >>> > >>> > >>> > >>> | | > >>> Sun.Zhu > >>> | > >>> | > >>> 邮箱:17626017...@163.com > >>> | > >>> > >>> Signature is customized by Netease Mail Master > >>> > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > >>> 嗯嗯,十分感谢 > >>> > >>> > >>> > >>> > >>> --原始邮件-- > >>> 发件人:"Benchao Li" >>> 发送时间:2020年4月19日(星期天) 晚上9:25 > >>> 收件人:"user-zh" >>> > >>> 主题:Re: 问题请教-flinksql的kafkasource方面 > >>> > >>> > >>> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > >>> > >>> Jark Wu >>> > >>> Hi, > >>> > >>> 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > >>> 根据你的 Java 代码,数据的 event time > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > >>> 能容忍 5s 乱序). > >>> 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > >>> partition > >>> 进度快很多的现象, > >>> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > >>> > >>> 完美的解决方案还需要等 FLIP-27 的完成。 > >>> 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > >>> > >>> Best, > >>> Jark > >>> > >>> > >>> On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 >>> > >>> 你好 > >>> > >>> > >>> > >>> > >> > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > >>> > >>> > >>> > >>> 附: > >>> userbehavior建表语句 > >>> CREATE TABLE user_behavior ( > >>> nbsp; nbsp; user_id BIGINT, > >>> nbsp; nbsp; item_id BIGINT, > >>> nbsp; nbsp; category_id BIGINT, > >>> nbsp; nbsp; behavior STRING, > >>> nbsp; nbsp; ts TIMESTAMP(3), > >>> nbsp; nbsp; proctime as PROCTIME(), nbsp; -- > >>> 通过计算列产生一个处理时间列 > >>> nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > >>> SECOND nbsp;-- > >>> 在ts上定义watermark,ts成为事件时间列 > >>> ) WITH ( > >>> nbsp; nbsp; 'connector.type' = 'kafka', > nbsp;-- > >>> 使用 kafka connector > >>> nbsp; nbsp; 'connector.v
Re: 问题请教-flinksql的kafkasource方面
我们是1.8版本,但是这段源码应该是没变把 // check if all tasks that we need to trigger are running. // if not, abort the checkpoint Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee == null) { LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } else if (ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } 还是我理解的不对 > 2020年4月20日 下午6:21,Benchao Li 写道: > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道: > >> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 >> >> >> >> >> >> | | >> Sun.Zhu >> | >> | >> 邮箱:17626017...@163.com >> | >> >> Signature is customized by Netease Mail Master >> >> 在2020年04月20日 10:37,Benchao Li 写道: >> 应该是不会的。分配不到partition的source会标记为idle状态。 >> >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: >> >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 >>> >>> >>> >>> >>> | | >>> Sun.Zhu >>> | >>> | >>> 邮箱:17626017...@163.com >>> | >>> >>> Signature is customized by Netease Mail Master >>> >>> 在2020年04月19日 22:43,人生若只如初见 写道: >>> 嗯嗯,十分感谢 >>> >>> >>> >>> >>> --原始邮件-- >>> 发件人:"Benchao Li">> 发送时间:2020年4月19日(星期天) 晚上9:25 >>> 收件人:"user-zh">> >>> 主题:Re: 问题请教-flinksql的kafkasource方面 >>> >>> >>> >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 >>> >>> Jark Wu >> >>> Hi, >>> >>> 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 >>> 根据你的 Java 代码,数据的 event time >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark >>> 能容忍 5s 乱序). >>> 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 >>> partition >>> 进度快很多的现象, >>> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 >>> >>> 完美的解决方案还需要等 FLIP-27 的完成。 >>> 当前可以通过增加 watermark delay来增大迟到数据的容忍。 >>> >>> Best, >>> Jark >>> >>> >>> On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 >> >>> 你好 >>> >>> >>> >>> >> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 >>> >>> >>> >>> 附: >>> userbehavior建表语句 >>> CREATE TABLE user_behavior ( >>> nbsp; nbsp; user_id BIGINT, >>> nbsp; nbsp; item_id BIGINT, >>> nbsp; nbsp; category_id BIGINT, >>> nbsp; nbsp; behavior STRING, >>> nbsp; nbsp; ts TIMESTAMP(3), >>> nbsp; nbsp; proctime as PROCTIME(), nbsp; -- >>> 通过计算列产生一个处理时间列 >>> nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5' >>> SECOND nbsp;-- >>> 在ts上定义watermark,ts成为事件时间列 >>> ) WITH ( >>> nbsp; nbsp; 'connector.type' = 'kafka', nbsp;-- >>> 使用 kafka connector >>> nbsp; nbsp; 'connector.version' = 'universal', >>> nbsp;-- kafka >>> 版本,universal 支持 0.11 以上的版本 >>> nbsp; nbsp; 'connector.topic' = 'user_behavior', >>> nbsp;-- kafka topic >>> nbsp; nbsp; 'connector.startup-mode' = >>> 'earliest-offset', nbsp;-- 从起始 >>> offset 开始读取 >>> nbsp; nbsp; 'connector.properties.zookeeper.connect' >> = >>> ' >>> 192.168.0.150:2181', nbsp;-- zookeeper 地址 >>> nbsp; nbsp; 'connector.properties.bootstrap.servers' >> = >>> ' >>> 192.168.0.150:9092', nbsp;-- kafka broker 地址 >>> nbsp; nbsp; 'format.type' = 'json' nbsp;-- >> 数据源格式为 >>> json >>> ) >>> >>>
Re: 问题请教-flinksql的kafkasource方面
你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道: > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > > > > > | | > Sun.Zhu > | > | > 邮箱:17626017...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年04月20日 10:37,Benchao Li 写道: > 应该是不会的。分配不到partition的source会标记为idle状态。 > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > > > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > > > > > > | | > > Sun.Zhu > > | > > | > > 邮箱:17626017...@163.com > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年04月19日 22:43,人生若只如初见 写道: > > 嗯嗯,十分感谢 > > > > > > > > > > --原始邮件-- > > 发件人:"Benchao Li" > 发送时间:2020年4月19日(星期天) 晚上9:25 > > 收件人:"user-zh" > > > 主题:Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > > > Jark Wu > > > Hi, > > > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > 根据你的 Java 代码,数据的 event time > > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > 能容忍 5s 乱序). > > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > > partition > > 进度快很多的现象, > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > > > 完美的解决方案还需要等 FLIP-27 的完成。 > > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > > > Best, > > Jark > > > > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > > > 你好 > > > > > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > 附: > > userbehavior建表语句 > > CREATE TABLE user_behavior ( > > nbsp; nbsp; user_id BIGINT, > > nbsp; nbsp; item_id BIGINT, > > nbsp; nbsp; category_id BIGINT, > > nbsp; nbsp; behavior STRING, > > nbsp; nbsp; ts TIMESTAMP(3), > > nbsp; nbsp; proctime as PROCTIME(), nbsp; -- > > 通过计算列产生一个处理时间列 > > nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > > SECOND nbsp;-- > > 在ts上定义watermark,ts成为事件时间列 > > ) WITH ( > > nbsp; nbsp; 'connector.type' = 'kafka', nbsp;-- > > 使用 kafka connector > > nbsp; nbsp; 'connector.version' = 'universal', > > nbsp;-- kafka > > 版本,universal 支持 0.11 以上的版本 > > nbsp; nbsp; 'connector.topic' = 'user_behavior', > > nbsp;-- kafka topic > > nbsp; nbsp; 'connector.startup-mode' = > > 'earliest-offset', nbsp;-- 从起始 > > offset 开始读取 > > nbsp; nbsp; 'connector.properties.zookeeper.connect' > = > > ' > > 192.168.0.150:2181', nbsp;-- zookeeper 地址 > > nbsp; nbsp; 'connector.properties.bootstrap.servers' > = > > ' > > 192.168.0.150:9092', nbsp;-- kafka broker 地址 > > nbsp; nbsp; 'format.type' = 'json' nbsp;-- > 数据源格式为 > > json > > ) > > > > 每小时购买数建表语句 > > CREATE TABLE buy_cnt_per_hour (nbsp; > > nbsp; nbsp; hour_of_day BIGINT, > > nbsp; nbsp; buy_cnt BIGINT > > ) WITH ( > > nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用 > > elasticsearch > > connector > > nbsp; nbsp; 'connector.version' = '6', nbsp;-- > > elasticsearch 版本,6 能支持 > > es 6+ 以及 7+ 的版本 > > nbsp; nbsp; 'connector.hosts' = ' > > http://192.168.0.150:9200', nbsp;-- > > elasticsearch 地址 > > nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour', > > nbsp;-- > > elasticsearch 索引名,相当于数据库的表名 > > nbsp; nbsp; 'connector.document-type' = > > 'user_behavior', -- > > elasticsearch 的 type,相当于数据库的库名 > > nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1', > > nbsp;-- 每条数据都刷新 > > nbsp; nbsp; 'format.type' = 'json', nbsp;-- > > 输出数据格式 json > > nbsp; nbsp; 'update-mode' = 'append' > > ) > > > > 插入语句 > > INSERT INTO buy_cnt_per_hournbsp; > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > > HOUR)),COUNT(*)nbsp; > > FROM user_behavior > > WHERE behavior = 'buy' > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > kafka数据发送代码 > > > > import com.alibaba.fastjson.JSONObject; > > import org.apache.kafka.clients.producer.KafkaProducer; > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > import java.text.SimpleDateForma
回复:问题请教-flinksql的kafkasource方面
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint ,源码CheckpointCoordinator#triggerCheckpoint也有说明 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年04月20日 10:37,Benchao Li 写道: 应该是不会的。分配不到partition的source会标记为idle状态。 Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > | | > Sun.Zhu > | > | > 邮箱:17626017...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年04月19日 22:43,人生若只如初见 写道: > 嗯嗯,十分感谢 > > > > > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25 > 收件人:"user-zh" > 主题:Re: 问题请教-flinksql的kafkasource方面 > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > Jark Wu > Hi, > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > 根据你的 Java 代码,数据的 event time > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > 能容忍 5s 乱序). > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > partition > 进度快很多的现象, > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > 完美的解决方案还需要等 FLIP-27 的完成。 > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > Best, > Jark > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > 你好 > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > 附: > userbehavior建表语句 > CREATE TABLE user_behavior ( > nbsp; nbsp; user_id BIGINT, > nbsp; nbsp; item_id BIGINT, > nbsp; nbsp; category_id BIGINT, > nbsp; nbsp; behavior STRING, > nbsp; nbsp; ts TIMESTAMP(3), > nbsp; nbsp; proctime as PROCTIME(), nbsp; -- > 通过计算列产生一个处理时间列 > nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > SECOND nbsp;-- > 在ts上定义watermark,ts成为事件时间列 > ) WITH ( > nbsp; nbsp; 'connector.type' = 'kafka', nbsp;-- > 使用 kafka connector > nbsp; nbsp; 'connector.version' = 'universal', > nbsp;-- kafka > 版本,universal 支持 0.11 以上的版本 > nbsp; nbsp; 'connector.topic' = 'user_behavior', > nbsp;-- kafka topic > nbsp; nbsp; 'connector.startup-mode' = > 'earliest-offset', nbsp;-- 从起始 > offset 开始读取 > nbsp; nbsp; 'connector.properties.zookeeper.connect' = > ' > 192.168.0.150:2181', nbsp;-- zookeeper 地址 > nbsp; nbsp; 'connector.properties.bootstrap.servers' = > ' > 192.168.0.150:9092', nbsp;-- kafka broker 地址 > nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为 > json > ) > > 每小时购买数建表语句 > CREATE TABLE buy_cnt_per_hour (nbsp; > nbsp; nbsp; hour_of_day BIGINT, > nbsp; nbsp; buy_cnt BIGINT > ) WITH ( > nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用 > elasticsearch > connector > nbsp; nbsp; 'connector.version' = '6', nbsp;-- > elasticsearch 版本,6 能支持 > es 6+ 以及 7+ 的版本 > nbsp; nbsp; 'connector.hosts' = ' > http://192.168.0.150:9200', nbsp;-- > elasticsearch 地址 > nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour', > nbsp;-- > elasticsearch 索引名,相当于数据库的表名 > nbsp; nbsp; 'connector.document-type' = > 'user_behavior', -- > elasticsearch 的 type,相当于数据库的库名 > nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1', > nbsp;-- 每条数据都刷新 > nbsp; nbsp; 'format.type' = 'json', nbsp;-- > 输出数据格式 json > nbsp; nbsp; 'update-mode' = 'append' > ) > > 插入语句 > INSERT INTO buy_cnt_per_hournbsp; > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > HOUR)),COUNT(*)nbsp; > FROM user_behavior > WHERE behavior = 'buy' > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > kafka数据发送代码 > > import com.alibaba.fastjson.JSONObject; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerRecord; > > import java.text.SimpleDateFormat; > import java.util.*; > > > public class UserBehaviorProducer { > public static final String brokerList = " > 192.168.0.150:9092"; > > // public static final > String topic="user_behavior"; > public static final String topic = > "user_behavior"; > > public static void main(String args[]) { > > //配置生产者客户端参数 > //将配置序列化 > Properties > properties = new Properties(); > > properties.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("bootstrap.servers", brokerList); > > //创建KafkaProducer 实例 > > KafkaProducer KafkaProducer<gt;(p
Re: 问题请教-flinksql的kafkasource方面
应该是不会的。分配不到partition的source会标记为idle状态。 Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > | | > Sun.Zhu > | > | > 邮箱:17626017...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年04月19日 22:43,人生若只如初见 写道: > 嗯嗯,十分感谢 > > > > > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25 > 收件人:"user-zh" > 主题:Re: 问题请教-flinksql的kafkasource方面 > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > Jark Wu > Hi, > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > 根据你的 Java 代码,数据的 event time > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > 能容忍 5s 乱序). > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > partition > 进度快很多的现象, > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > 完美的解决方案还需要等 FLIP-27 的完成。 > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > Best, > Jark > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > 你好 > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > 附: > userbehavior建表语句 > CREATE TABLE user_behavior ( > nbsp; nbsp; user_id BIGINT, > nbsp; nbsp; item_id BIGINT, > nbsp; nbsp; category_id BIGINT, > nbsp; nbsp; behavior STRING, > nbsp; nbsp; ts TIMESTAMP(3), > nbsp; nbsp; proctime as PROCTIME(), nbsp; -- > 通过计算列产生一个处理时间列 > nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > SECOND nbsp;-- > 在ts上定义watermark,ts成为事件时间列 > ) WITH ( > nbsp; nbsp; 'connector.type' = 'kafka', nbsp;-- > 使用 kafka connector > nbsp; nbsp; 'connector.version' = 'universal', > nbsp;-- kafka > 版本,universal 支持 0.11 以上的版本 > nbsp; nbsp; 'connector.topic' = 'user_behavior', > nbsp;-- kafka topic > nbsp; nbsp; 'connector.startup-mode' = > 'earliest-offset', nbsp;-- 从起始 > offset 开始读取 > nbsp; nbsp; 'connector.properties.zookeeper.connect' = > ' > 192.168.0.150:2181', nbsp;-- zookeeper 地址 > nbsp; nbsp; 'connector.properties.bootstrap.servers' = > ' > 192.168.0.150:9092', nbsp;-- kafka broker 地址 > nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为 > json > ) > > 每小时购买数建表语句 > CREATE TABLE buy_cnt_per_hour (nbsp; > nbsp; nbsp; hour_of_day BIGINT, > nbsp; nbsp; buy_cnt BIGINT > ) WITH ( > nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用 > elasticsearch > connector > nbsp; nbsp; 'connector.version' = '6', nbsp;-- > elasticsearch 版本,6 能支持 > es 6+ 以及 7+ 的版本 > nbsp; nbsp; 'connector.hosts' = ' > http://192.168.0.150:9200', nbsp;-- > elasticsearch 地址 > nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour', > nbsp;-- > elasticsearch 索引名,相当于数据库的表名 > nbsp; nbsp; 'connector.document-type' = > 'user_behavior', -- > elasticsearch 的 type,相当于数据库的库名 > nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1', > nbsp;-- 每条数据都刷新 > nbsp; nbsp; 'format.type' = 'json', nbsp;-- > 输出数据格式 json > nbsp; nbsp; 'update-mode' = 'append' > ) > > 插入语句 > INSERT INTO buy_cnt_per_hournbsp; > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > HOUR)),COUNT(*)nbsp; > FROM user_behavior > WHERE behavior = 'buy' > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > kafka数据发送代码 > > import com.alibaba.fastjson.JSONObject; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerRecord; > > import java.text.SimpleDateFormat; > import java.util.*; > > > public class UserBehaviorProducer { > public static final String brokerList = " > 192.168.0.150:9092"; > > // public static final > String topic="user_behavior"; > public static final String topic = > "user_behavior"; > > public static void main(String args[]) { > > //配置生产者客户端参数 > //将配置序列化 > Properties > properties = new Properties(); > > properties.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("bootstrap.servers", brokerList); > > //创建KafkaProducer 实例 > > KafkaProducer KafkaProducer<gt;(properties); > //构建待发送的消息 > //{"user_id": > "952483", "item_id":"310884", "category_id": > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > //{"
Re: 问题请教-flinksql的kafkasource方面
e) { > > jsonObject.put("user_id", random.nextInt(90) + 10 + > > ""); > > jsonObject.put("item_id", random.nextInt(90) + 10 + > > ""); > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > jsonObject.put("ts", format.format(new Date(date_long))); > > String msg = jsonObject.toString(); > > System.out.println(msg); > > ProducerRecord > ProducerRecord<(topic, msg); > > producer.send(record); > > //date_long +=500+random.nextGaussian()*1000; > > date_long +=800+random.nextGaussian()*1500; > > try { > > Thread.sleep(60); > > } catch (InterruptedException e) { > > e.printStackTrace(); > > } > > } > > > > } > > > > private static long getDate() { > > Date date = new Date(); > > Calendar c = Calendar.getInstance(); > > c.setTime(date); > > //设置为1号,当前日期既为本月第一天 > > c.set(Calendar.DAY_OF_MONTH, 1); > > //将小时至0 > > c.set(Calendar.HOUR_OF_DAY, 0); > > //将分钟至0 > > c.set(Calendar.MINUTE, 0); > > //将秒至0 > > c.set(Calendar.SECOND,0); > > //将毫秒至0 > > c.set(Calendar.MILLISECOND, 0); > > // 本月第一天的时间戳转换为字符串 > > return c.getTimeInMillis(); > > } > > } > > > > --原始邮件-- > > 发件人:"Jark Wu" > 发送时间:2020年4月18日(星期六) 晚上10:08 > > 收件人:"user-zh" > > > 主题:Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > Hi, > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > > > Best, > > Jark > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 > > > 大佬好: > > nbsp; nbsp; > > > nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > nbsp; nbsp; > > > > > nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > nbsp;nbsp; > > nbsp; nbsp; > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > view as ...")却会报错。报错如下: > > Exception in thread "main" > org.apache.flink.table.api.TableException: > > Unsupported query: CREATE VIEW rich_user_behavior AS > > SELECT U.user_id, U.item_id, U.behavior,nbsp; > > nbsp; CASE C.parent_category_id > > nbsp; nbsp; WHEN 1 THEN '服饰鞋包' > > nbsp; nbsp; WHEN 2 THEN '家装家饰' > > nbsp; nbsp; WHEN 3 THEN '家电' > > nbsp; nbsp; WHEN 4 THEN '美妆' > > nbsp; nbsp; WHEN 5 THEN '母婴' > > nbsp; nbsp; WHEN 6 THEN '3C数码' > > nbsp; nbsp; WHEN 7 THEN '运动户外' > > nbsp; nbsp; WHEN 8 THEN '食品' > > nbsp; nbsp; ELSE '其他' > > nbsp; END AS category_name > > FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > > U.proctime AS C > > ON U.category_id = C.sub_category_id > > at > > > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > at > > > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > Source) > > at > > java.util.Optional.orElseThrow(Optional.java:290) > > at > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > at > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > at > > > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > > > > > > > > > nbsp; nbsp; nbsp; nbsp; 望解答,十分感谢! > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: 问题请教-flinksql的kafkasource方面
; //date_long +=500+random.nextGaussian()*1000; > date_long +=800+random.nextGaussian()*1500; > try { > Thread.sleep(60); > } catch (InterruptedException e) { > e.printStackTrace(); > } > } > > } > > private static long getDate() { > Date date = new Date(); > Calendar c = Calendar.getInstance(); > c.setTime(date); > //设置为1号,当前日期既为本月第一天 > c.set(Calendar.DAY_OF_MONTH, 1); > //将小时至0 > c.set(Calendar.HOUR_OF_DAY, 0); > //将分钟至0 > c.set(Calendar.MINUTE, 0); > //将秒至0 > c.set(Calendar.SECOND,0); > //将毫秒至0 > c.set(Calendar.MILLISECOND, 0); > // 本月第一天的时间戳转换为字符串 > return c.getTimeInMillis(); > } > } > > --原始邮件-- > 发件人:"Jark Wu" 发送时间:2020年4月18日(星期六) 晚上10:08 > 收件人:"user-zh" > 主题:Re: 问题请教-flinksql的kafkasource方面 > > > > Hi, > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > Best, > Jark > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 > 大佬好: > nbsp; nbsp; > nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > nbsp; nbsp; > > nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > nbsp;nbsp; > nbsp; nbsp; > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > view as ...")却会报错。报错如下: > Exception in thread "main" org.apache.flink.table.api.TableException: > Unsupported query: CREATE VIEW rich_user_behavior AS > SELECT U.user_id, U.item_id, U.behavior,nbsp; > nbsp; CASE C.parent_category_id > nbsp; nbsp; WHEN 1 THEN '服饰鞋包' > nbsp; nbsp; WHEN 2 THEN '家装家饰' > nbsp; nbsp; WHEN 3 THEN '家电' > nbsp; nbsp; WHEN 4 THEN '美妆' > nbsp; nbsp; WHEN 5 THEN '母婴' > nbsp; nbsp; WHEN 6 THEN '3C数码' > nbsp; nbsp; WHEN 7 THEN '运动户外' > nbsp; nbsp; WHEN 8 THEN '食品' > nbsp; nbsp; ELSE '其他' > nbsp; END AS category_name > FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > U.proctime AS C > ON U.category_id = C.sub_category_id > at > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > at > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > Source) > at > java.util.Optional.orElseThrow(Optional.java:290) > at > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > > > nbsp; nbsp; nbsp; nbsp; 望解答,十分感谢!
Re: 问题请教-flinksql的kafkasource方面
Hi, 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? Best, Jark On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 wrote: > 大佬好: > > 请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > 问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > view as ...")却会报错。报错如下: > Exception in thread "main" org.apache.flink.table.api.TableException: > Unsupported query: CREATE VIEW rich_user_behavior AS > SELECT U.user_id, U.item_id, U.behavior, > CASE C.parent_category_id > WHEN 1 THEN '服饰鞋包' > WHEN 2 THEN '家装家饰' > WHEN 3 THEN '家电' > WHEN 4 THEN '美妆' > WHEN 5 THEN '母婴' > WHEN 6 THEN '3C数码' > WHEN 7 THEN '运动户外' > WHEN 8 THEN '食品' > ELSE '其他' > END AS category_name > FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > U.proctime AS C > ON U.category_id = C.sub_category_id > at > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > at > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > Source) > at java.util.Optional.orElseThrow(Optional.java:290) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > > > 望解答,十分感谢!