回复:问题请教-flinksql的kafkasource方面

2020-04-20 文章 Sun.Zhu
嗯是的,都设置成小于等于partition数




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月21日 00:28,Jark Wu 写道:
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li  wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >if (ee == null) {
> >   LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job);
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >} else if (ee.getState() == ExecutionState.RUNNING) {
> >   executions[i] = ee;
> >} else {
> >   LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job,
> > ExecutionState.RUNNING,
> > ee.getState());
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >}
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li  写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:17626017...@163.com
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:17626017...@163.com
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> --原始邮件--
> > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25
> > >>> 收件人:"user-zh" > >>>
> > >>> 主题:Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu  > >>>
> > >>>  Hi,
> > >>> 
> > >>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>>  根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>>  能容忍 5s 乱序).
> > >>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>>  进度快很多的现象,
> > >>> 
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> 
> > >>>  完美的解决方案还需要等 FLIP-27 的完成。
> > >>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> 
> > >>>  Best,
> > >>>  Jark
> > >>> 
> > >>> 
> > >>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > >>> 
> > >>>   你好
> > >>>  
> > >>>  
> > >>> 
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果

Re: 问题请教-flinksql的kafkasource方面

2020-04-20 文章 Jark Wu
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li  wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >if (ee == null) {
> >   LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job);
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >} else if (ee.getState() == ExecutionState.RUNNING) {
> >   executions[i] = ee;
> >} else {
> >   LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job,
> > ExecutionState.RUNNING,
> > ee.getState());
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >}
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li  写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:17626017...@163.com
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:17626017...@163.com
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> --原始邮件--
> > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25
> > >>> 收件人:"user-zh" > >>>
> > >>> 主题:Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu  > >>>
> > >>>  Hi,
> > >>> 
> > >>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>>  根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>>  能容忍 5s 乱序).
> > >>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>>  进度快很多的现象,
> > >>> 
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> 
> > >>>  完美的解决方案还需要等 FLIP-27 的完成。
> > >>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> 
> > >>>  Best,
> > >>>  Jark
> > >>> 
> > >>> 
> > >>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > >>> 
> > >>>   你好
> > >>>  
> > >>>  
> > >>> 
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > >>>  
> > >>>  
> > >>>  

Re: 问题请教-flinksql的kafkasource方面

2020-04-20 文章 Benchao Li
我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。

祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:

> 我们是1.8版本,但是这段源码应该是没变把
> // check if all tasks that we need to trigger are running.
> // if not, abort the checkpoint
> Execution[] executions = new Execution[tasksToTrigger.length];
> for (int i = 0; i < tasksToTrigger.length; i++) {
>Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
>if (ee == null) {
>   LOG.info("Checkpoint triggering task {} of job {} is not being
> executed at the moment. Aborting checkpoint.",
> tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> job);
>   throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>} else if (ee.getState() == ExecutionState.RUNNING) {
>   executions[i] = ee;
>} else {
>   LOG.info("Checkpoint triggering task {} of job {} is not in state {}
> but {} instead. Aborting checkpoint.",
> tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> job,
> ExecutionState.RUNNING,
> ee.getState());
>   throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>}
> }
> 还是我理解的不对
>
> > 2020年4月20日 下午6:21,Benchao Li  写道:
> >
> > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> >
> > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> >
> >>
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> >>
> >>
> >>
> >>
> >>
> >> | |
> >> Sun.Zhu
> >> |
> >> |
> >> 邮箱:17626017...@163.com
> >> |
> >>
> >> Signature is customized by Netease Mail Master
> >>
> >> 在2020年04月20日 10:37,Benchao Li 写道:
> >> 应该是不会的。分配不到partition的source会标记为idle状态。
> >>
> >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> >>
> >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> >>>
> >>>
> >>>
> >>>
> >>> | |
> >>> Sun.Zhu
> >>> |
> >>> |
> >>> 邮箱:17626017...@163.com
> >>> |
> >>>
> >>> Signature is customized by Netease Mail Master
> >>>
> >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> >>> 嗯嗯,十分感谢
> >>>
> >>>
> >>>
> >>>
> >>> --原始邮件--
> >>> 发件人:"Benchao Li" >>> 发送时间:2020年4月19日(星期天) 晚上9:25
> >>> 收件人:"user-zh" >>>
> >>> 主题:Re: 问题请教-flinksql的kafkasource方面
> >>>
> >>>
> >>>
> >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> >>>
> >>> Jark Wu  >>>
> >>>  Hi,
> >>> 
> >>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> >>>  根据你的 Java 代码,数据的 event time
> >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> >>>  能容忍 5s 乱序).
> >>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> >>> partition
> >>>  进度快很多的现象,
> >>>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> >>> 
> >>>  完美的解决方案还需要等 FLIP-27 的完成。
> >>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> >>> 
> >>>  Best,
> >>>  Jark
> >>> 
> >>> 
> >>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  >>> 
> >>>   你好
> >>>  
> >>>  
> >>> 
> >>>
> >>
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >>>  
> >>>  
> >>>  
> >>>   附:
> >>>   userbehavior建表语句
> >>>   CREATE TABLE user_behavior (
> >>>   nbsp; nbsp; user_id BIGINT,
> >>>   nbsp; nbsp; item_id BIGINT,
> >>>   nbsp; nbsp; category_id BIGINT,
> >>>   nbsp; nbsp; behavior STRING,
> >>>   nbsp; nbsp; ts TIMESTAMP(3),
> >>>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> >>> 通过计算列产生一个处理时间列
> >>>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> >>> SECOND nbsp;--
> >>>   在ts上定义watermark,ts成为事件时间列
> >>>   ) WITH (
> >>>   nbsp; nbsp; 'connector.type' = 'kafka',
> nbsp;--
> >>> 使用 kafka connector
> >>>   nbsp; nbsp; 'connector.v

Re: 问题请教-flinksql的kafkasource方面

2020-04-20 文章 祝尚
我们是1.8版本,但是这段源码应该是没变把
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
   Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
   if (ee == null) {
  LOG.info("Checkpoint triggering task {} of job {} is not being executed 
at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
  throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
   } else if (ee.getState() == ExecutionState.RUNNING) {
  executions[i] = ee;
   } else {
  LOG.info("Checkpoint triggering task {} of job {} is not in state {} but 
{} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
  throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
   }
}
还是我理解的不对

> 2020年4月20日 下午6:21,Benchao Li  写道:
> 
> 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> 
> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> 
>> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
>> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
>> 
>> 
>> 
>> 
>> 
>> | |
>> Sun.Zhu
>> |
>> |
>> 邮箱:17626017...@163.com
>> |
>> 
>> Signature is customized by Netease Mail Master
>> 
>> 在2020年04月20日 10:37,Benchao Li 写道:
>> 应该是不会的。分配不到partition的source会标记为idle状态。
>> 
>> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
>> 
>>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>>> 
>>> 
>>> 
>>> 
>>> | |
>>> Sun.Zhu
>>> |
>>> |
>>> 邮箱:17626017...@163.com
>>> |
>>> 
>>> Signature is customized by Netease Mail Master
>>> 
>>> 在2020年04月19日 22:43,人生若只如初见 写道:
>>> 嗯嗯,十分感谢
>>> 
>>> 
>>> 
>>> 
>>> --原始邮件--
>>> 发件人:"Benchao Li">> 发送时间:2020年4月19日(星期天) 晚上9:25
>>> 收件人:"user-zh">> 
>>> 主题:Re: 问题请教-flinksql的kafkasource方面
>>> 
>>> 
>>> 
>>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>>> 
>>> Jark Wu >> 
>>>  Hi,
>>> 
>>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>>>  根据你的 Java 代码,数据的 event time
>>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>>>  能容忍 5s 乱序).
>>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
>>> partition
>>>  进度快很多的现象,
>>>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
>>> 
>>>  完美的解决方案还需要等 FLIP-27 的完成。
>>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
>>> 
>>>  Best,
>>>  Jark
>>> 
>>> 
>>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 >> 
>>>   你好
>>>  
>>>  
>>> 
>>> 
>> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>>>  
>>>  
>>>  
>>>   附:
>>>   userbehavior建表语句
>>>   CREATE TABLE user_behavior (
>>>   nbsp; nbsp; user_id BIGINT,
>>>   nbsp; nbsp; item_id BIGINT,
>>>   nbsp; nbsp; category_id BIGINT,
>>>   nbsp; nbsp; behavior STRING,
>>>   nbsp; nbsp; ts TIMESTAMP(3),
>>>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
>>> 通过计算列产生一个处理时间列
>>>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
>>> SECOND nbsp;--
>>>   在ts上定义watermark,ts成为事件时间列
>>>   ) WITH (
>>>   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
>>> 使用 kafka connector
>>>   nbsp; nbsp; 'connector.version' = 'universal',
>>> nbsp;-- kafka
>>>   版本,universal 支持 0.11 以上的版本
>>>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
>>> nbsp;-- kafka topic
>>>   nbsp; nbsp; 'connector.startup-mode' =
>>> 'earliest-offset', nbsp;-- 从起始
>>>   offset 开始读取
>>>   nbsp; nbsp; 'connector.properties.zookeeper.connect'
>> =
>>> '
>>>   192.168.0.150:2181', nbsp;-- zookeeper 地址
>>>   nbsp; nbsp; 'connector.properties.bootstrap.servers'
>> =
>>> '
>>>   192.168.0.150:9092', nbsp;-- kafka broker 地址
>>>   nbsp; nbsp; 'format.type' = 'json' nbsp;--
>> 数据源格式为
>>> json
>>>   )
>>>  
>>>

Re: 问题请教-flinksql的kafkasource方面

2020-04-20 文章 Benchao Li
你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:

> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
>
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月20日 10:37,Benchao Li 写道:
> 应该是不会的。分配不到partition的source会标记为idle状态。
>
> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
>
> > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> >
> >
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 邮箱:17626017...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年04月19日 22:43,人生若只如初见 写道:
> > 嗯嗯,十分感谢
> >
> >
> >
> >
> > --原始邮件--
> >  发件人:"Benchao Li" > 发送时间:2020年4月19日(星期天) 晚上9:25
> > 收件人:"user-zh" >
> > 主题:Re: 问题请教-flinksql的kafkasource方面
> >
> >
> >
> > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> >
> > Jark Wu  >
> >  Hi,
> > 
> >  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> >  根据你的 Java 代码,数据的 event time
> > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> >  能容忍 5s 乱序).
> >  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> > partition
> >  进度快很多的现象,
> >  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > 
> >  完美的解决方案还需要等 FLIP-27 的完成。
> >  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > 
> >  Best,
> >  Jark
> > 
> > 
> >  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > 
> >   你好
> >  
> >  
> > 
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >  
> >  
> >  
> >   附:
> >   userbehavior建表语句
> >   CREATE TABLE user_behavior (
> >   nbsp; nbsp; user_id BIGINT,
> >   nbsp; nbsp; item_id BIGINT,
> >   nbsp; nbsp; category_id BIGINT,
> >   nbsp; nbsp; behavior STRING,
> >   nbsp; nbsp; ts TIMESTAMP(3),
> >   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> > 通过计算列产生一个处理时间列
> >   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> > SECOND nbsp;--
> >   在ts上定义watermark,ts成为事件时间列
> >   ) WITH (
> >   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
> > 使用 kafka connector
> >   nbsp; nbsp; 'connector.version' = 'universal',
> > nbsp;-- kafka
> >   版本,universal 支持 0.11 以上的版本
> >   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> > nbsp;-- kafka topic
> >   nbsp; nbsp; 'connector.startup-mode' =
> > 'earliest-offset', nbsp;-- 从起始
> >   offset 开始读取
> >   nbsp; nbsp; 'connector.properties.zookeeper.connect'
> =
> > '
> >   192.168.0.150:2181', nbsp;-- zookeeper 地址
> >   nbsp; nbsp; 'connector.properties.bootstrap.servers'
> =
> > '
> >   192.168.0.150:9092', nbsp;-- kafka broker 地址
> >   nbsp; nbsp; 'format.type' = 'json' nbsp;--
> 数据源格式为
> > json
> >   )
> >  
> >   每小时购买数建表语句
> >   CREATE TABLE buy_cnt_per_hour (nbsp;
> >   nbsp; nbsp; hour_of_day BIGINT,
> >   nbsp; nbsp; buy_cnt BIGINT
> >   ) WITH (
> >   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
> > elasticsearch
> >   connector
> >   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> > elasticsearch 版本,6 能支持
> >   es 6+ 以及 7+ 的版本
> >   nbsp; nbsp; 'connector.hosts' = '
> > http://192.168.0.150:9200', nbsp;--
> >   elasticsearch 地址
> >   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> > nbsp;--
> >   elasticsearch 索引名,相当于数据库的表名
> >   nbsp; nbsp; 'connector.document-type' =
> > 'user_behavior', --
> >   elasticsearch 的 type,相当于数据库的库名
> >   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
> > nbsp;-- 每条数据都刷新
> >   nbsp; nbsp; 'format.type' = 'json', nbsp;--
> > 输出数据格式 json
> >   nbsp; nbsp; 'update-mode' = 'append'
> >   )
> >  
> >   插入语句
> >   INSERT INTO buy_cnt_per_hournbsp;
> >   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> > HOUR)),COUNT(*)nbsp;
> >   FROM user_behavior
> >   WHERE behavior = 'buy'
> >   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> >  
> >   kafka数据发送代码
> >  
> >   import com.alibaba.fastjson.JSONObject;
> >   import org.apache.kafka.clients.producer.KafkaProducer;
> >   import org.apache.kafka.clients.producer.ProducerRecord;
> >  
> >   import java.text.SimpleDateForma

回复:问题请教-flinksql的kafkasource方面

2020-04-20 文章 Sun.Zhu
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
 ,源码CheckpointCoordinator#triggerCheckpoint也有说明





| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月20日 10:37,Benchao Li 写道:
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> --原始邮件--
>  发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25
> 收件人:"user-zh"
> 主题:Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu 
>  Hi,
> 
>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>  根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>  能容忍 5s 乱序).
>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
>  进度快很多的现象,
>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> 
>  完美的解决方案还需要等 FLIP-27 的完成。
>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> 
>  Best,
>  Jark
> 
> 
>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  
>   你好
>  
>  
> 
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>  
>  
>  
>   附:
>   userbehavior建表语句
>   CREATE TABLE user_behavior (
>   nbsp; nbsp; user_id BIGINT,
>   nbsp; nbsp; item_id BIGINT,
>   nbsp; nbsp; category_id BIGINT,
>   nbsp; nbsp; behavior STRING,
>   nbsp; nbsp; ts TIMESTAMP(3),
>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> 通过计算列产生一个处理时间列
>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND nbsp;--
>   在ts上定义watermark,ts成为事件时间列
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
> 使用 kafka connector
>   nbsp; nbsp; 'connector.version' = 'universal',
> nbsp;-- kafka
>   版本,universal 支持 0.11 以上的版本
>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> nbsp;-- kafka topic
>   nbsp; nbsp; 'connector.startup-mode' =
> 'earliest-offset', nbsp;-- 从起始
>   offset 开始读取
>   nbsp; nbsp; 'connector.properties.zookeeper.connect' =
> '
>   192.168.0.150:2181', nbsp;-- zookeeper 地址
>   nbsp; nbsp; 'connector.properties.bootstrap.servers' =
> '
>   192.168.0.150:9092', nbsp;-- kafka broker 地址
>   nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为
> json
>   )
>  
>   每小时购买数建表语句
>   CREATE TABLE buy_cnt_per_hour (nbsp;
>   nbsp; nbsp; hour_of_day BIGINT,
>   nbsp; nbsp; buy_cnt BIGINT
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
>   connector
>   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> elasticsearch 版本,6 能支持
>   es 6+ 以及 7+ 的版本
>   nbsp; nbsp; 'connector.hosts' = '
> http://192.168.0.150:9200', nbsp;--
>   elasticsearch 地址
>   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> nbsp;--
>   elasticsearch 索引名,相当于数据库的表名
>   nbsp; nbsp; 'connector.document-type' =
> 'user_behavior', --
>   elasticsearch 的 type,相当于数据库的库名
>   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
> nbsp;-- 每条数据都刷新
>   nbsp; nbsp; 'format.type' = 'json', nbsp;--
> 输出数据格式 json
>   nbsp; nbsp; 'update-mode' = 'append'
>   )
>  
>   插入语句
>   INSERT INTO buy_cnt_per_hournbsp;
>   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*)nbsp;
>   FROM user_behavior
>   WHERE behavior = 'buy'
>   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>  
>   kafka数据发送代码
>  
>   import com.alibaba.fastjson.JSONObject;
>   import org.apache.kafka.clients.producer.KafkaProducer;
>   import org.apache.kafka.clients.producer.ProducerRecord;
>  
>   import java.text.SimpleDateFormat;
>   import java.util.*;
>  
>  
>   public class UserBehaviorProducer {
>   public static final String brokerList = "
> 192.168.0.150:9092";
>  
>   // public static final
> String topic="user_behavior";
>   public static final String topic =
> "user_behavior";
>  
>   public static void main(String args[]) {
>  
>   //配置生产者客户端参数
>   //将配置序列化
>   Properties
> properties = new Properties();
>  
> properties.put("key.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("value.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("bootstrap.servers", brokerList);
>  
> //创建KafkaProducer 实例
>  
> KafkaProducer   KafkaProducer<gt;(p

Re: 问题请教-flinksql的kafkasource方面

2020-04-19 文章 Benchao Li
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> --原始邮件--
>  发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25
> 收件人:"user-zh"
> 主题:Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu 
>  Hi,
> 
>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>  根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>  能容忍 5s 乱序).
>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
>  进度快很多的现象,
>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> 
>  完美的解决方案还需要等 FLIP-27 的完成。
>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> 
>  Best,
>  Jark
> 
> 
>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  
>   你好
>  
>  
> 
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>  
>  
>  
>   附:
>   userbehavior建表语句
>   CREATE TABLE user_behavior (
>   nbsp; nbsp; user_id BIGINT,
>   nbsp; nbsp; item_id BIGINT,
>   nbsp; nbsp; category_id BIGINT,
>   nbsp; nbsp; behavior STRING,
>   nbsp; nbsp; ts TIMESTAMP(3),
>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> 通过计算列产生一个处理时间列
>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND nbsp;--
>   在ts上定义watermark,ts成为事件时间列
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
> 使用 kafka connector
>   nbsp; nbsp; 'connector.version' = 'universal',
> nbsp;-- kafka
>   版本,universal 支持 0.11 以上的版本
>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> nbsp;-- kafka topic
>   nbsp; nbsp; 'connector.startup-mode' =
> 'earliest-offset', nbsp;-- 从起始
>   offset 开始读取
>   nbsp; nbsp; 'connector.properties.zookeeper.connect' =
> '
>   192.168.0.150:2181', nbsp;-- zookeeper 地址
>   nbsp; nbsp; 'connector.properties.bootstrap.servers' =
> '
>   192.168.0.150:9092', nbsp;-- kafka broker 地址
>   nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为
> json
>   )
>  
>   每小时购买数建表语句
>   CREATE TABLE buy_cnt_per_hour (nbsp;
>   nbsp; nbsp; hour_of_day BIGINT,
>   nbsp; nbsp; buy_cnt BIGINT
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
>   connector
>   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> elasticsearch 版本,6 能支持
>   es 6+ 以及 7+ 的版本
>   nbsp; nbsp; 'connector.hosts' = '
> http://192.168.0.150:9200', nbsp;--
>   elasticsearch 地址
>   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> nbsp;--
>   elasticsearch 索引名,相当于数据库的表名
>   nbsp; nbsp; 'connector.document-type' =
> 'user_behavior', --
>   elasticsearch 的 type,相当于数据库的库名
>   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
> nbsp;-- 每条数据都刷新
>   nbsp; nbsp; 'format.type' = 'json', nbsp;--
> 输出数据格式 json
>   nbsp; nbsp; 'update-mode' = 'append'
>   )
>  
>   插入语句
>   INSERT INTO buy_cnt_per_hournbsp;
>   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*)nbsp;
>   FROM user_behavior
>   WHERE behavior = 'buy'
>   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>  
>   kafka数据发送代码
>  
>   import com.alibaba.fastjson.JSONObject;
>   import org.apache.kafka.clients.producer.KafkaProducer;
>   import org.apache.kafka.clients.producer.ProducerRecord;
>  
>   import java.text.SimpleDateFormat;
>   import java.util.*;
>  
>  
>   public class UserBehaviorProducer {
>   public static final String brokerList = "
> 192.168.0.150:9092";
>  
>   // public static final
> String topic="user_behavior";
>   public static final String topic =
> "user_behavior";
>  
>   public static void main(String args[]) {
>  
>   //配置生产者客户端参数
>   //将配置序列化
>   Properties
> properties = new Properties();
>  
> properties.put("key.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("value.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("bootstrap.servers", brokerList);
>  
> //创建KafkaProducer 实例
>  
> KafkaProducer   KafkaProducer<gt;(properties);
>   //构建待发送的消息
>   //{"user_id":
> "952483", "item_id":"310884", "category_id":
>   "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>   //{"

Re: 问题请教-flinksql的kafkasource方面

2020-04-19 文章 Benchao Li
e) {
> > jsonObject.put("user_id", random.nextInt(90) + 10 +
> > "");
> > jsonObject.put("item_id", random.nextInt(90) + 10 +
> > "");
> > jsonObject.put("category_id", random.nextInt(1000) + "");
> > jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> > jsonObject.put("ts", format.format(new Date(date_long)));
> > String msg = jsonObject.toString();
> > System.out.println(msg);
> >     ProducerRecord > ProducerRecord<(topic, msg);
> > producer.send(record);
> > //date_long +=500+random.nextGaussian()*1000;
> > date_long +=800+random.nextGaussian()*1500;
> > try {
> > Thread.sleep(60);
> > } catch (InterruptedException e) {
> > e.printStackTrace();
> > }
> > }
> >
> > }
> >
> > private static long getDate() {
> > Date date = new Date();
> > Calendar c = Calendar.getInstance();
> > c.setTime(date);
> > //设置为1号,当前日期既为本月第一天
> > c.set(Calendar.DAY_OF_MONTH, 1);
> > //将小时至0
> > c.set(Calendar.HOUR_OF_DAY, 0);
> > //将分钟至0
> > c.set(Calendar.MINUTE, 0);
> > //将秒至0
> > c.set(Calendar.SECOND,0);
> > //将毫秒至0
> > c.set(Calendar.MILLISECOND, 0);
> > // 本月第一天的时间戳转换为字符串
> > return c.getTimeInMillis();
> > }
> > }
> >
> > --原始邮件--
> > 发件人:"Jark Wu" > 发送时间:2020年4月18日(星期六) 晚上10:08
> > 收件人:"user-zh" >
> > 主题:Re: 问题请教-flinksql的kafkasource方面
> >
> >
> >
> > Hi,
> >
> > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
> > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
> >
> > Best,
> > Jark
> >
> > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见  >
> >  大佬好:
> >  nbsp; nbsp;
> > 
> nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> >  nbsp; nbsp;
> > 
> >
> nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> >  nbsp;nbsp;
> >  nbsp; nbsp;
> > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> >  view as ...")却会报错。报错如下:
> >  Exception in thread "main"
> org.apache.flink.table.api.TableException:
> >  Unsupported query: CREATE VIEW rich_user_behavior AS
> >  SELECT U.user_id, U.item_id, U.behavior,nbsp;
> >  nbsp; CASE C.parent_category_id
> >  nbsp; nbsp; WHEN 1 THEN '服饰鞋包'
> >  nbsp; nbsp; WHEN 2 THEN '家装家饰'
> >  nbsp; nbsp; WHEN 3 THEN '家电'
> >  nbsp; nbsp; WHEN 4 THEN '美妆'
> >  nbsp; nbsp; WHEN 5 THEN '母婴'
> >  nbsp; nbsp; WHEN 6 THEN '3C数码'
> >  nbsp; nbsp; WHEN 7 THEN '运动户外'
> >  nbsp; nbsp; WHEN 8 THEN '食品'
> >  nbsp; nbsp; ELSE '其他'
> >  nbsp; END AS category_name
> >  FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
> >  U.proctime AS C
> >  ON U.category_id = C.sub_category_id
> >  at
> > 
> >
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> >  at
> > 
> >
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> >  Source)
> >  at
> > java.util.Optional.orElseThrow(Optional.java:290)
> >  at
> > 
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> >  at
> > 
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> >  at
> > 
> >
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> > 
> > 
> > 
> > 
> > 
> >  nbsp; nbsp; nbsp; nbsp; 望解答,十分感谢!
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 问题请教-flinksql的kafkasource方面

2020-04-19 文章 Jark Wu
; //date_long +=500+random.nextGaussian()*1000;
> date_long +=800+random.nextGaussian()*1500;
> try {
> Thread.sleep(60);
>     } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
>
> }
>
> private static long getDate() {
> Date date = new Date();
> Calendar c = Calendar.getInstance();
> c.setTime(date);
> //设置为1号,当前日期既为本月第一天
> c.set(Calendar.DAY_OF_MONTH, 1);
> //将小时至0
> c.set(Calendar.HOUR_OF_DAY, 0);
> //将分钟至0
> c.set(Calendar.MINUTE, 0);
> //将秒至0
> c.set(Calendar.SECOND,0);
> //将毫秒至0
> c.set(Calendar.MILLISECOND, 0);
> // 本月第一天的时间戳转换为字符串
> return c.getTimeInMillis();
> }
> }
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年4月18日(星期六) 晚上10:08
> 收件人:"user-zh"
> 主题:Re: 问题请教-flinksql的kafkasource方面
>
>
>
> Hi,
>
> 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
> 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
>
> Best,
> Jark
>
> On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 
>  大佬好:
>  nbsp; nbsp;
>  nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
>  nbsp; nbsp;
> 
> nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
>  nbsp;nbsp;
>  nbsp; nbsp;
> 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
>  view as ...")却会报错。报错如下:
>  Exception in thread "main" org.apache.flink.table.api.TableException:
>  Unsupported query: CREATE VIEW rich_user_behavior AS
>  SELECT U.user_id, U.item_id, U.behavior,nbsp;
>  nbsp; CASE C.parent_category_id
>  nbsp; nbsp; WHEN 1 THEN '服饰鞋包'
>  nbsp; nbsp; WHEN 2 THEN '家装家饰'
>  nbsp; nbsp; WHEN 3 THEN '家电'
>  nbsp; nbsp; WHEN 4 THEN '美妆'
>  nbsp; nbsp; WHEN 5 THEN '母婴'
>  nbsp; nbsp; WHEN 6 THEN '3C数码'
>  nbsp; nbsp; WHEN 7 THEN '运动户外'
>  nbsp; nbsp; WHEN 8 THEN '食品'
>  nbsp; nbsp; ELSE '其他'
>  nbsp; END AS category_name
>  FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
>  U.proctime AS C
>  ON U.category_id = C.sub_category_id
>  at
> 
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
>  at
> 
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
>  Source)
>  at
> java.util.Optional.orElseThrow(Optional.java:290)
>  at
> 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
>  at
> 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>  at
> 
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> 
> 
> 
> 
> 
>  nbsp; nbsp; nbsp; nbsp; 望解答,十分感谢!


Re: 问题请教-flinksql的kafkasource方面

2020-04-18 文章 Jark Wu
Hi,

关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?

Best,
Jark

On Sat, 18 Apr 2020 at 18:38, 人生若只如初见  wrote:

> 大佬好:
>  
> 请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
>  
> 问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> 
>   问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> view as ...")却会报错。报错如下:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Unsupported query: CREATE VIEW rich_user_behavior AS
> SELECT U.user_id, U.item_id, U.behavior,
>  CASE C.parent_category_id
>   WHEN 1 THEN '服饰鞋包'
>   WHEN 2 THEN '家装家饰'
>   WHEN 3 THEN '家电'
>   WHEN 4 THEN '美妆'
>   WHEN 5 THEN '母婴'
>   WHEN 6 THEN '3C数码'
>   WHEN 7 THEN '运动户外'
>   WHEN 8 THEN '食品'
>   ELSE '其他'
>  END AS category_name
> FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
> U.proctime AS C
> ON U.category_id = C.sub_category_id
> at
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> at
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> Source)
> at java.util.Optional.orElseThrow(Optional.java:290)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
>
>
>
>
>
> 望解答,十分感谢!