Re: source并行度不同导致任务没有数据落地

2019-12-25 文章 JingsongLee
Hi zhaorui,

你是不是指定了Rowtime的列?
如果指定了,Kafka是否有的通道一直没有数据或者数据没有前进?

Window的输出触发是需要watermark前进的,这也就需要你的每个通道都有数据在时间上前进,也就是说每个Kafka的通道都需要有最新时间点的数据源源不断的来。

你设置成一个并发,那就只要一个通道有数据就可以了,所以绕过了这个问题。

Best,
Jingsong Lee


--
From:zhaorui_9...@163.com 
Send Time:2019年12月26日(星期四) 12:03
To:user-zh 
Subject:source并行度不同导致任务没有数据落地

hi all:
   
最近碰到一个很头疼的事情,两个任务相同的sql语句不同的source,任务的并行度为8,一个source是kafka一个source是rabbitmq,kafka和rabbitmq中加载相同的数据后,source为rabbitmq的任务有数据落地,source为kafka的任务运行好几次都不见有数据落地。因为sql中涉及到了窗口,所以考虑过kafka多partition对数据读取顺序的影响,将所有数据都加载到kafka的同一个partition中重启任务后发现还是没有数据落地。考虑到这两个任务唯一的不同点就是源为rabbitmq的任务source算子的并行度为1,所以将源为kafka的任务的source并行度也设为1,运行任务后发现有数据落地了。source并行度的改变应该只是改变了一下source与其它算子之间的数据传递方式,这种改变会对最终的结果造成影响吗?有没有大佬碰到过相同的问题?
flink版本1.9.1
sql:select count(ps_comment) col1,ceil(stddev_pop(ps_availqty)) col2,
   tumble_start(over_time,interval '72' hour) col3,
   tumble_end(over_time,interval '72' hour) col4,
   ps_date
from cirrostream_kafka_ck_source_03_8x3
where ps_availqty <= 489
  and ps_supplycost > 998
  and ps_comment not like '%ff%'
  and ps_partkey <= 3751122
   or ps_suppkey = 723
group by ps_date,ps_availqty,tumble(over_time,interval '72' hour)
having min(ps_partkey) not in (3525711,3738707,3740245)



zhaorui_9...@163.com


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 文章 JingsongLee
Hi faaron zheng,

如kurt所说,强烈建议使用1.10,现在已拉分支。

TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年12月26日(星期四) 14:07
To:user-zh 
Subject:Re: Flink1.9批任务yn和ys对任务的影响

也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 文章 Jark Wu
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15396

On Thu, 26 Dec 2019 at 11:44, LakeShen  wrote:

> Hi community,when I write the flink ddl sql like this:
>
> CREATE TABLE kafka_src (
>   id varchar,
>   a varchar,
>   b TIMESTAMP,
>   c TIMESTAMP
> )
>   with (
>...
> 'format.type' = 'json',
> 'format.property-version' = '1',
> 'format.derive-schema' = 'true',
> 'update-mode' = 'append'
> );
>
> If the message is not the json format ,there is a error in the log。
> My question is that how to deal with the message which it not json format?
> My thought is that I can catch the exception
> in JsonRowDeserializationSchema deserialize() method,is there any
> parameters to do this?
> Thanks your replay.
>
>


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 文章 Kurt Young
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 文章 Xintong Song
slot需要多少内存是和具体作业相关的,不同作业差别会比较大。

slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。

如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
profile"就能够看到slot的资源需求。

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options

On Thu, Dec 26, 2019 at 11:36 AM faaron zheng  wrote:

> 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed
> memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> AM faaron zheng  wrote: > 跑tpcds的query1: flink run
> -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制


source并行度不同导致任务没有数据落地

2019-12-25 文章 zhaorui_9...@163.com
hi all:
   
最近碰到一个很头疼的事情,两个任务相同的sql语句不同的source,任务的并行度为8,一个source是kafka一个source是rabbitmq,kafka和rabbitmq中加载相同的数据后,source为rabbitmq的任务有数据落地,source为kafka的任务运行好几次都不见有数据落地。因为sql中涉及到了窗口,所以考虑过kafka多partition对数据读取顺序的影响,将所有数据都加载到kafka的同一个partition中重启任务后发现还是没有数据落地。考虑到这两个任务唯一的不同点就是源为rabbitmq的任务source算子的并行度为1,所以将源为kafka的任务的source并行度也设为1,运行任务后发现有数据落地了。source并行度的改变应该只是改变了一下source与其它算子之间的数据传递方式,这种改变会对最终的结果造成影响吗?有没有大佬碰到过相同的问题?
flink版本1.9.1
sql:select count(ps_comment) col1,ceil(stddev_pop(ps_availqty)) col2,
   tumble_start(over_time,interval '72' hour) col3,
   tumble_end(over_time,interval '72' hour) col4,
   ps_date
from cirrostream_kafka_ck_source_03_8x3
where ps_availqty <= 489
  and ps_supplycost > 998
  and ps_comment not like '%ff%'
  and ps_partkey <= 3751122
   or ps_suppkey = 723
group by ps_date,ps_availqty,tumble(over_time,interval '72' hour)
having min(ps_partkey) not in (3525711,3738707,3740245)



zhaorui_9...@163.com


Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 文章 LakeShen
Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception
in JsonRowDeserializationSchema deserialize() method,is there any
parameters to do this?
Thanks your replay.


Re: Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题

2019-12-25 文章 LakeShen
是否可以尝试使用幂等来解决 端到端的一致性

Best wishes,
沈磊

卢伟楠  于2019年12月25日周三 下午4:09写道:

> 各位大佬好:
>
> 最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题:
> 1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException:
> Communications link failure during commit(). Transaction resolution unknown.
> 2:org.apache.flink.streaming.runtime.tasks.TimerException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> 已经做了一个最简单的复现问题的demo,求指教
> git clone https://github.com/lusecond/flink_help --depth=1
>
>
> 测试过程中,发现继承TwoPhaseCommitSinkFunction类的4个重写方法beginTransaction、preCommit、commit、abort
> 分别在不同的线程工作,怀疑过因为线程切换导致jdbc的事务提交出问题,已经做过相关测试排除不是由此引起的问题


回复:Flink1.9批任务yn和ys对任务的影响

2019-12-25 文章 faaron zheng
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: 
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: Hi 
faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 
TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink 1.9 的sql batch 算子对 
flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed memory无法满足算子的资源需求了。 
Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 AM faaron zheng 
 wrote: > 跑tpcds的query1: flink run -m yarn-cluster -d -p 
100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 
100 -yn 10 -ys 10 -yjm > 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot 
available and request to > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 
faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制

Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 文章 Zhijiang
If I understood correctly, different partitions of Kafka would be emitted by 
different source tasks with different watermark progress.  And the Flink 
framework would align the different watermarks to only output the smallest 
watermark among them, so the events from slow partitions would not be discarded 
because the downstream operator would only see the watermark based on the slow 
partition atm. You can refer to [1] for some details.

As for rewinding the offset of partition position, I guess it only happens in 
failure recovery case or you manually restart the job. Anyway all the topology 
tasks would be restarted and previous received watermarks are cleared.
So it would also not discard the events in this case.  Unless you can only 
rewind some source task to previous positions and keep other downstream tasks 
still running, it might have the issues you concern. But Flink can not support 
such operation/function atm. :) 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html

Best,
Zhijiang
--
From:邢瑞斌 
Send Time:2019 Dec. 25 (Wed.) 20:27
To:user-zh ; user 
Subject:Rewind offset to a previous position and ensure certainty.

Hi,

I'm trying to use Kafka as an event store and I want to create several 
partitions to improve read/write throughput. Occasionally I need to rewind 
offset to a previous position for recomputing. Since order isn't guaranteed 
among partitions in Kafka, does this mean that Flink won't produce the same 
results as before when rewind even if it uses event time? For example, consumer 
for a partition progresses extremely fast and raises watermark, so events from 
other partitions are discarded. Is there any ways to prevent this from 
happening?

Thanks in advance!

Ruibin



回复:Flink1.9批任务yn和ys对任务的影响

2019-12-25 文章 faaron zheng
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: Hi 
faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 
TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink 1.9 的sql batch 算子对 
flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed memory无法满足算子的资源需求了。 
Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 AM faaron zheng 
 wrote: > 跑tpcds的query1: flink run -m yarn-cluster -d -p 
100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 
100 -yn 10 -ys 10 -yjm > 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot 
available and request to > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 
faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制

Re: 关于 FLink historyserver没有completed-jobs的问题

2019-12-25 文章 pengchenglin

flink-conf.yaml里需要有这些配置
historyserver.web.port: 8082
historyserver.web.address: 0.0.0.0
historyserver.archive.fs.refresh-interval: 1
historyserver.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
jobmanager.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
#多少秒后,会将完成的任务提交到history
jobstore.expiration-time: 14400
jobmanager.archive.fs.dir和historyserver.archive.fs.dir一样即可
然后启动bin/historyserver.sh start
访问ip:8082,需要跑一个任务,并且等待jobstore.expiration-time这个时间,才会有数据
 
发件人: 起子
发送时间: 2019-12-25 15:57
收件人: user-zh
主题: 关于 FLink historyserver没有completed-jobs的问题
大神们:
我启动了flink的historyserver,但是里面并没有已完成的任务
配置如下:

结果界面如下:
hdfs如下:
麻烦大神们给与指导
 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705



Re: flink 维表关联

2019-12-25 文章 lucas.wu
Hi 李现
现实确实很难做到对流表进行全量的join,如需全量,state会占用很大的存储,而且后续迁移很困难。请问一下你说的这个方案可以举个例子吗?


原始邮件
发件人:李现stormallin2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月26日(周四) 08:44
主题:Re: flink 维表关联


流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny 
nj18652727...@gmail.com于2019年12月25日 周三18:13写道:  Hi,lucas.wu:   
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
  不过这样state会占用很大的内存,需要主意state的清理   lucas.wu lucas...@xiaoying.com 
于2019年12月25日周三 下午5:13写道:hi all:   
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 文章 vino yang
Hi Ruibin,

Are you finding how to generate watermark pre Kafka partition?
Flink provides Kafka-partition-aware watermark generation. [1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

邢瑞斌  于2019年12月25日周三 下午8:27写道:

> Hi,
>
> I'm trying to use Kafka as an event store and I want to create several
> partitions to improve read/write throughput. Occasionally I need to rewind
> offset to a previous position for recomputing. Since order isn't guaranteed
> among partitions in Kafka, does this mean that Flink won't produce the same
> results as before when rewind even if it uses event time? For example,
> consumer for a partition progresses extremely fast and raises watermark, so
> events from other partitions are discarded. Is there any ways to prevent
> this from happening?
>
> Thanks in advance!
>
> Ruibin
>


回复: flink 维表关联

2019-12-25 文章 叶贤勋
可以使用guava实现维表数据缓存在jvm,可以设置缓存数据有效期


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2019年12月26日 08:44,李现 写道:
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理?

xin Destiny 于2019年12月25日 周三18:13写道:

Hi,lucas.wu:

我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
不过这样state会占用很大的内存,需要主意state的清理

lucas.wu  于2019年12月25日周三 下午5:13写道:

hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?



Re: flink 维表关联

2019-12-25 文章 李现
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理?

xin Destiny 于2019年12月25日 周三18:13写道:

> Hi,lucas.wu:
>
> 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
> 不过这样state会占用很大的内存,需要主意state的清理
>
> lucas.wu  于2019年12月25日周三 下午5:13写道:
>
> > hi all:
> > flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
>


Re: The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed

2019-12-25 文章 jingjing bai
tm挂掉了,可以看下是否存在checkpoint连续失败导致OOM, 或者是大数据集大窗口运算,如果数据量大也会导致这个问题。

Xintong Song  于2019年12月25日周三 上午10:28写道:

> 这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote:
>
> > 偶尔发现,分配好的slot突然就被remove了,导致作业重启,看不出是什么原因导致?CPU和FULL GC都没有,异常信息如下:
> >
> > org.apache.flink.util.FlinkException: The assigned slot
> > bae00218c818157649eb9e3c533b86af_11 was removed.
> >         at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
> >         at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
> >         at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
> >         at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
> >         at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
> >         at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
> >         at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
> >         at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
> >         at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> >         at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> >         at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> >         at
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> >         at
> > akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >         at
> > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >         at
> > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >         at
> > akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >         at
> > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >         at
> akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >         at
> > akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >         at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >         at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >         at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >         at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>


Rewind offset to a previous position and ensure certainty.

2019-12-25 文章 邢瑞斌
Hi,

I'm trying to use Kafka as an event store and I want to create several
partitions to improve read/write throughput. Occasionally I need to rewind
offset to a previous position for recomputing. Since order isn't guaranteed
among partitions in Kafka, does this mean that Flink won't produce the same
results as before when rewind even if it uses event time? For example,
consumer for a partition progresses extremely fast and raises watermark, so
events from other partitions are discarded. Is there any ways to prevent
this from happening?

Thanks in advance!

Ruibin


Re: flink 维表关联

2019-12-25 文章 xin Destiny
Hi,lucas.wu:
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
不过这样state会占用很大的内存,需要主意state的清理

lucas.wu  于2019年12月25日周三 下午5:13写道:

> hi all:
> flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?


flink 维表关联

2019-12-25 文章 lucas.wu
hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题

2019-12-25 文章 卢伟楠
各位大佬好:

最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题:
1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: 
Communications link failure during commit(). Transaction resolution unknown.
2:org.apache.flink.streaming.runtime.tasks.TimerException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

已经做了一个最简单的复现问题的demo,求指教
git clone https://github.com/lusecond/flink_help --depth=1

测试过程中,发现继承TwoPhaseCommitSinkFunction类的4个重写方法beginTransaction、preCommit、commit、abort
分别在不同的线程工作,怀疑过因为线程切换导致jdbc的事务提交出问题,已经做过相关测试排除不是由此引起的问题

关于 FLink historyserver没有completed-jobs的问题

2019-12-25 文章 起子
大神们:
我启动了flink的historyserver,但是里面并没有已完成的任务
配置如下:

结果界面如下:

hdfs如下:

麻烦大神们给与指导

 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705