flink 1.12 中如何读取 mysql datetime 字段

2021-01-31 文章 macdoor
在 mysql 中创建表
CREATE TABLE `p_port_packet_loss_5m` (
  `id` binary(16) NOT NULL,
  `coltime` datetime NOT NULL,
...

在flink 中创建表
create table if not exists p_port_packet_loss_5m
(
  id bytes,
  coltime timestamp,
...)
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://ip:port/mydatabase',

在flink sql 客户端执行
select * from p_port_packet_loss_5m;

总是报错 
java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
java.sql.Timestamp

改了若干种数据类型都不行,这种情况该如何处理呢?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink编译报错

2021-01-31 文章 wxpcc
mvn clean install -T 4C -Pfast -DskipTests -Dcheckstyle.skip=true
-DnpmRegistryURL=https://registry.npm.taobao.org 可以用这个试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 是否可以 hive 流 join hive 流?

2021-01-31 文章 Leonard Xu
还没有,你可以关注下这个issue[1]

祝好,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-21183

> 在 2021年2月1日,13:29,macdoor  写道:
> 
> 当前的 1.13-snapshot 支持了吗?我可以试试吗?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 关于配置关联初始化方案的实现问题

2021-01-31 文章 javenjiangfsof
官网上没有,在github上https://github.com/ververica/flink-cdc-connectors
```
SourceFunction sourceFunction = MySQLSource.builder() 
.hostname("localhost") .port(3306) .databaseList("inventory") // monitor all 
tables under inventory database .username("flinkuser") .password("flinkpw") 
.deserializer(new StringDebeziumDeserializationSchema()) // converts 
SourceRecord to String .build();
```

在 2021年2月1日 14:07,赵一旦 写道:


我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。 javenjiangfsof 
 于2021年2月1日周一 下午1:40写道: > DataStream API,像下面这样 > ``` > 
val list = ... //i use jdbc to get the init data > val dimensionInitStream = 
env.fromCollection(list) > val dimension = > 
dimensionStream.union(dimensionInitStream).broadcast(descriptor) > 
mainStream.connect(dimensionStream) > ... > ``` > 
注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置 > > 在 2021年2月1日 
13:30,赵一旦 写道: > > > FlinkSQL ? javenjiangfsof 
 于2021年2月1日周一 > 上午11:40写道: > Hi 社区的各位 > > > 
最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
 > > + > > 
broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
 > > 1.初始化通过jdbc获取,通过fromCollection处理后,union > > 
cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… 
> > > > 
2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
 > > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > > 
liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢

Re: 关于配置关联初始化方案的实现问题

2021-01-31 文章 赵一旦
我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。


javenjiangfsof  于2021年2月1日周一 下午1:40写道:

> DataStream API,像下面这样
> ```
> val list = ...   //i use jdbc to get the init data
> val dimensionInitStream = env.fromCollection(list)
> val dimension =
> dimensionStream.union(dimensionInitStream).broadcast(descriptor)
> mainStream.connect(dimensionStream)
> ...
> ```
> 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置
>
> 在 2021年2月1日 13:30,赵一旦 写道:
>
>
> FlinkSQL ? javenjiangfsof  于2021年2月1日周一
> 上午11:40写道: > Hi 社区的各位 > >
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> > + >
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
> > 1.初始化通过jdbc获取,通过fromCollection处理后,union >
> cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
> > >
> 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
> > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ >
> liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢


Re: 是否可以 hive 流 join hive 流?

2021-01-31 文章 macdoor
当前的 1.13-snapshot 支持了吗?我可以试试吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于配置关联初始化方案的实现问题

2021-01-31 文章 javenjiangfsof
DataStream API,像下面这样
```
val list = ...   //i use jdbc to get the init data
val dimensionInitStream = env.fromCollection(list)
val dimension =
dimensionStream.union(dimensionInitStream).broadcast(descriptor)
mainStream.connect(dimensionStream)
...
```
注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置

在 2021年2月1日 13:30,赵一旦 写道:


FlinkSQL ? javenjiangfsof  于2021年2月1日周一 上午11:40写道: > Hi 
社区的各位 > > 
最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
 > + > 
broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
 > 1.初始化通过jdbc获取,通过fromCollection处理后,union > 
cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… 
> > 
2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
 > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > 
liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢

Re: 关于配置关联初始化方案的实现问题

2021-01-31 文章 赵一旦
FlinkSQL ?

javenjiangfsof  于2021年2月1日周一 上午11:40写道:

> Hi 社区的各位
>
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> +
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
>   1.初始化通过jdbc获取,通过fromCollection处理后,union
> cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
>
> 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
>   3.更好的方案???
>
>
>   目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+
> liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举?
>   希望能看到各位的回复,感谢


Re: 是否可以 hive 流 join hive 流?

2021-01-31 文章 Leonard Xu
Okay, 和我理解的一样,这个时间上是 event time, 基于event time的 interval join 
需要定义watermark,目前hive表还不支持定义watermark,1.13应该会支持。



> 在 2021年2月1日,10:58,macdoor  写道:
> 
> p1.time 是数据记录里的时间,也用这个时间做分区
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Re: 水印的作用请教

2021-01-31 文章 amenhub
很详尽了,非常感谢 @tison !



 
发件人: tison
发送时间: 2021-02-01 11:43
收件人: user-zh
主题: Re: Re: 水印的作用请教
对于 StreamingFileSink 可以查看这两份资料
 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
https://stackoverflow.com/questions/54763160/is-it-possible-that-bucketing-sink-create-bucket-on-event-time
 
默认的,watermark 和 (EventTime) Timestamp 信息会带到 BucketAssigner 实现所需的 Context
里,Flink 没有开箱即用的基于 EventTime 的分桶策略,你需要自己尝试实现。比水印晚的数据,可以自行实现为丢弃或追加到原有分区文件上。
 
对于 SQL 可以查看这份资料
 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html
 
目前看起来开箱的逻辑迟到数据会追加而不是丢弃。有一些相关的配置可以调整 commit 也就是落盘的时机,但不影响落盘的数据。
 
Best,
tison.
 
 
amenhub  于2021年2月1日周一 上午11:07写道:
 
> StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了
>
>
>
>
> 发件人: tison
> 发送时间: 2021-02-01 11:01
> 收件人: user-zh
> 主题: Re: Re: 水印的作用请教
> 请问你使用哪种 SinkConnector 写入 HDFS 呢?
>
> Best,
> tison.
>
>
> amenhub  于2021年2月1日周一 上午10:58写道:
>
> > >>>
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> > 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
> >
> >
> >
> >
> > 发件人: amenhub
> > 发送时间: 2021-02-01 10:44
> > 收件人: user-zh
> > 主题: Re: Re: 水印的作用请教
> > 谢谢回复!
> >
> > 也就是说如果我利用Flink从Kafka (Select
> > *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
> >
> > best,
> > amenhub
> >
> >
> >
> > 发件人: tison
> > 发送时间: 2021-02-01 10:36
> > 收件人: user-zh
> > 主题: Re: 水印的作用请教
> > 取决于你的计算流图,watermark 通常只在以下情况有实际作用
> > True
> > & cond 1. 使用 EventTime
> > & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
> > Best,
> > tison.
> > amenhub  于2021年2月1日周一 上午10:26写道:
> > > hi everyone,
> > >
> > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
> > >
> > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> > > 那么,
> > > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
> > >
> > >
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> > >
> > > best,
> > > amenhub
> > >
> > >
> > >
> > >
> >
>


Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-01-31 文章 Zhu Zhu
Thanks Xintong for being the release manager and everyone who helped with
the release!

Cheers,
Zhu

Dian Fu  于2021年1月29日周五 下午5:56写道:

> Thanks Xintong for driving this release!
>
> Regards,
> Dian
>
> 在 2021年1月29日,下午5:24,Till Rohrmann  写道:
>
> Thanks Xintong for being our release manager. Well done!
>
> Cheers,
> Till
>
> On Fri, Jan 29, 2021 at 9:50 AM Yang Wang  wrote:
>
>> Thanks Xintong for driving this release.
>>
>> Best,
>> Yang
>>
>> Yu Li  于2021年1月29日周五 下午3:52写道:
>>
>>> Thanks Xintong for being our release manager and everyone else who made
>>> the release possible!
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:
>>>
 The Apache Flink community is very happy to announce the release of
 Apache
 Flink 1.10.3, which is the third bugfix release for the Apache Flink
 1.10
 series.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data
 streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements
 for this bugfix release:
 https://flink.apache.org/news/2021/01/29/release-1.10.3.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

 We would like to thank all contributors of the Apache Flink community
 who
 made this release possible!

 Regards,
 Xintong Song

>>>
>


Re: Re: 水印的作用请教

2021-01-31 文章 tison
对于 StreamingFileSink 可以查看这两份资料

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
https://stackoverflow.com/questions/54763160/is-it-possible-that-bucketing-sink-create-bucket-on-event-time

默认的,watermark 和 (EventTime) Timestamp 信息会带到 BucketAssigner 实现所需的 Context
里,Flink 没有开箱即用的基于 EventTime 的分桶策略,你需要自己尝试实现。比水印晚的数据,可以自行实现为丢弃或追加到原有分区文件上。

对于 SQL 可以查看这份资料

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html

目前看起来开箱的逻辑迟到数据会追加而不是丢弃。有一些相关的配置可以调整 commit 也就是落盘的时机,但不影响落盘的数据。

Best,
tison.


amenhub  于2021年2月1日周一 上午11:07写道:

> StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了
>
>
>
>
> 发件人: tison
> 发送时间: 2021-02-01 11:01
> 收件人: user-zh
> 主题: Re: Re: 水印的作用请教
> 请问你使用哪种 SinkConnector 写入 HDFS 呢?
>
> Best,
> tison.
>
>
> amenhub  于2021年2月1日周一 上午10:58写道:
>
> > >>>
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> > 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
> >
> >
> >
> >
> > 发件人: amenhub
> > 发送时间: 2021-02-01 10:44
> > 收件人: user-zh
> > 主题: Re: Re: 水印的作用请教
> > 谢谢回复!
> >
> > 也就是说如果我利用Flink从Kafka (Select
> > *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
> >
> > best,
> > amenhub
> >
> >
> >
> > 发件人: tison
> > 发送时间: 2021-02-01 10:36
> > 收件人: user-zh
> > 主题: Re: 水印的作用请教
> > 取决于你的计算流图,watermark 通常只在以下情况有实际作用
> > True
> > & cond 1. 使用 EventTime
> > & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
> > Best,
> > tison.
> > amenhub  于2021年2月1日周一 上午10:26写道:
> > > hi everyone,
> > >
> > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
> > >
> > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> > > 那么,
> > > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
> > >
> > >
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> > >
> > > best,
> > > amenhub
> > >
> > >
> > >
> > >
> >
>


关于配置关联初始化方案的实现问题

2021-01-31 文章 javenjiangfsof
Hi 社区的各位
  
最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
 + 
broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
  1.初始化通过jdbc获取,通过fromCollection处理后,union 
cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
  
2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
  3.更好的方案???


  目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ 
liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举?
  希望能看到各位的回复,感谢

Re: 请问pyflink如何跟kerberos认证的kafka对接呢

2021-01-31 文章 Wei Zhong
Hi,

看你之前发的邮件,你现在是把kerberos相关的配置放在某一个flink-conf.yaml里,然后启动了一个local模式吧?

但是local模式的pyflink 
shell是不会主动读取任何flink-conf.yaml的。需要配置环境变量FLINK_HOME,将相关配置写入$FLINK_HOME/conf/flink-conf.yaml里,并且只有在提交job时候(flink
 run、remote模式或者yarn模式)才会去读取flink-conf.yaml里的内容。

如果执意要在local模式下尝试,可以通过以下代码:
from pyflink.java_gateway import get_gateway
System = get_gateway().jvm.System
拿到java中的System对象,然后按照java中的方式进行配置。

> 在 2021年1月30日,13:58,瞿叶奇 <389243...@qq.com> 写道:
> 
> 附测试程序,希望老师给出解决方法。测试时发现只更新csv文件的修改日期,但没有实际数据录入。怀疑kafka连接还存在问题。
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, 
> CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(3000)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
>  
> "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
>  'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 
> 'kafka').property("kerberos.domain.name", 
> 'hadoop.hadoop.com').property("bootstrap.servers", 
> "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
>  DataTypes.BIGINT()),DataTypes.FIELD("name", 
> DataTypes.STRING())]))).with_schema(Schema().field("id", 
> DataTypes.BIGINT()).field("name", 
> DataTypes.STRING())).register_table_source("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, 
> WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v2")



Re: Re: 水印的作用请教

2021-01-31 文章 amenhub
StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了



 
发件人: tison
发送时间: 2021-02-01 11:01
收件人: user-zh
主题: Re: Re: 水印的作用请教
请问你使用哪种 SinkConnector 写入 HDFS 呢?
 
Best,
tison.
 
 
amenhub  于2021年2月1日周一 上午10:58写道:
 
> >>>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
>
>
>
>
> 发件人: amenhub
> 发送时间: 2021-02-01 10:44
> 收件人: user-zh
> 主题: Re: Re: 水印的作用请教
> 谢谢回复!
>
> 也就是说如果我利用Flink从Kafka (Select
> *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
>
> best,
> amenhub
>
>
>
> 发件人: tison
> 发送时间: 2021-02-01 10:36
> 收件人: user-zh
> 主题: Re: 水印的作用请教
> 取决于你的计算流图,watermark 通常只在以下情况有实际作用
> True
> & cond 1. 使用 EventTime
> & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
> Best,
> tison.
> amenhub  于2021年2月1日周一 上午10:26写道:
> > hi everyone,
> >
> > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
> >
> > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> > 那么,
> > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
> >
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> >
> > best,
> > amenhub
> >
> >
> >
> >
>


Re: 是否可以 hive 流 join hive 流?

2021-01-31 文章 macdoor
p1.time 是数据记录里的时间,也用这个时间做分区



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 水印的作用请教

2021-01-31 文章 tison
请问你使用哪种 SinkConnector 写入 HDFS 呢?

Best,
tison.


amenhub  于2021年2月1日周一 上午10:58写道:

> >>>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
>
>
>
>
> 发件人: amenhub
> 发送时间: 2021-02-01 10:44
> 收件人: user-zh
> 主题: Re: Re: 水印的作用请教
> 谢谢回复!
>
> 也就是说如果我利用Flink从Kafka (Select
> *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
>
> best,
> amenhub
>
>
>
> 发件人: tison
> 发送时间: 2021-02-01 10:36
> 收件人: user-zh
> 主题: Re: 水印的作用请教
> 取决于你的计算流图,watermark 通常只在以下情况有实际作用
> True
> & cond 1. 使用 EventTime
> & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
> Best,
> tison.
> amenhub  于2021年2月1日周一 上午10:26写道:
> > hi everyone,
> >
> > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
> >
> > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> > 那么,
> > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
> >
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> >
> > best,
> > amenhub
> >
> >
> >
> >
>


Re: Re: 水印的作用请教

2021-01-31 文章 amenhub
>>> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?



 
发件人: amenhub
发送时间: 2021-02-01 10:44
收件人: user-zh
主题: Re: Re: 水印的作用请教
谢谢回复!
 
也就是说如果我利用Flink从Kafka (Select 
*)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
 
best,
amenhub
 
 
 
发件人: tison
发送时间: 2021-02-01 10:36
收件人: user-zh
主题: Re: 水印的作用请教
取决于你的计算流图,watermark 通常只在以下情况有实际作用
True
& cond 1. 使用 EventTime
& cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
Best,
tison.
amenhub  于2021年2月1日周一 上午10:26写道:
> hi everyone,
>
> 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
>
> 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> 那么,
> 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
>
> best,
> amenhub
>
>
>
>


Re: flink sql时区问题

2021-01-31 文章 Leonard Xu
嗯,flink 中 很多时间函数比如PROCTIME()/CURRENT_TIMESTAMP 返回的值都是 
UTC+0的时间值,这里的timezone设置对这些函数不生效的,这些函数是有点时区问题的,
目前只能在代码里通过加减时区偏移绕过。

> 在 2021年2月1日,10:50,沉醉寒風 <1039601...@qq.com> 写道:
> 
> 在代码中这样设置 streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8")) 
> 也不管用. 还是要自己手动去加减时间才能做到,方法比较笨,
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年2月1日(星期一) 上午10:46
> 收件人:"user-zh" 
> 主题:Re: flink sql时区问题
> 
> 
> 
> Hi,
> 时区不生效在你的代码中是体现在那些地方呀?目前flink sql是有些时区问题,社区也希望在1.13能解决掉。
> 
> 
>  在 2021年2月1日,10:42,沉醉寒風 <1039601...@qq.com 写道:
>  
>  streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))



?????? flink sql????????

2021-01-31 文章 ???????L
 streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8")) 
. ,??,




----
??: 
   "user-zh"



Re: flink sql时区问题

2021-01-31 文章 Leonard Xu
Hi,
时区不生效在你的代码中是体现在那些地方呀?目前flink sql是有些时区问题,社区也希望在1.13能解决掉。


> 在 2021年2月1日,10:42,沉醉寒風 <1039601...@qq.com> 写道:
> 
> streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))



Re: 是否可以 hive 流 join hive 流?

2021-01-31 文章 Leonard Xu
Hi,macdoor

很有意思的case,p1.time字段是你记录里的时间吗? 你hive表的分区字段和这个时间字段的关系是怎么样的呀?


> 在 2021年1月30日,17:54,macdoor  写道:
> 
> 具体需求是这样,采集取得的通道总流量5分钟一次存入 hive 表,为了取得 5 分钟内该通道的流量,需要前后2次采集到的总流量相减,我想用同一个 hive
> 表自己相互 join,形成 2 个 hive 流 join,不知道是否可以实现?或者有其他实现方法吗?
> 我现在使用 crontab 定时 batch 模式做,希望能改成 stream 模式
> 
> select p1.traffic -p2.traffic
> from p as p1
> inner join p as p2 on p1.id=p2.id and p1.time=p2.time + interval 5 minutes
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Re: 水印的作用请教

2021-01-31 文章 amenhub
谢谢回复!

也就是说如果我利用Flink从Kafka (Select 
*)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?

best,
amenhub



 
发件人: tison
发送时间: 2021-02-01 10:36
收件人: user-zh
主题: Re: 水印的作用请教
取决于你的计算流图,watermark 通常只在以下情况有实际作用
 
True
& cond 1. 使用 EventTime
& cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
 
Best,
tison.
 
 
amenhub  于2021年2月1日周一 上午10:26写道:
 
> hi everyone,
>
> 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
>
> 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> 那么,
> 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
>
> best,
> amenhub
>
>
>
>


flink sql????????

2021-01-31 文章 ???????L
streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))

flink sql ????????????

2021-01-31 文章 ???????L
flink sql+8,??

Re: 水印的作用请教

2021-01-31 文章 tison
取决于你的计算流图,watermark 通常只在以下情况有实际作用

True
& cond 1. 使用 EventTime
& cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer

Best,
tison.


amenhub  于2021年2月1日周一 上午10:26写道:

> hi everyone,
>
> 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
>
> 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> 那么,
> 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
>
> best,
> amenhub
>
>
>
>


Re: 咨询求助

2021-01-31 文章 Leonard Xu


> 在 2021年1月31日,20:15,Appleyuchi  写道:
> 
> 一点小小的建议哈,
> 目前flink社区讨论主要还是java/scala为主,
> 如果执意使用pyflink的话,后续极有可能会遇到较大的阻力.

我理解这种较大阻力应该不存在的,社区里pyflink的投入还挺大的,蛮多开发者的,我也cc两位在这块做贡献的社区开发者,从JIRA上看pyflink相关的开发进度都挺快的。
如果有机器学习,python相关的经验,用pyflink我觉得挺合适的。

祝好,
Leonard

水印的作用请教

2021-01-31 文章 amenhub
hi everyone,

最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。

在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
那么,
1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?

best,
amenhub





?????? ??????????????????????????????????watermark,????????????????????????watermark???????????????1??

2021-01-31 文章 ??????
,




----
??: 
   "user-zh"



Re: Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了

2021-01-31 文章 hezongji...@qq.com

你好,图片在上传在附件里面了


hezongji...@qq.com
 
发件人: tison
发送时间: 2021-02-01 09:31
收件人: user-zh
主题: Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
邮件列表不支持直接粘贴图片,请尝试使用附件或 gist 等方式共享。
 
Best,
tison.
 
 
hezongji...@qq.com  于2021年2月1日周一 上午9:28写道:
 
> 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
> 代码如下:
>
> 运行结果如下:
> --
> hezongji...@qq.com
>


Re: 请教 flink 1.12.1 插入 mysql 有时耗时很长

2021-01-31 文章 macdoor
有时候这种job持续2个多小时,我只能cancel job,但无法正常 cancel,都会导致 taskmanager 挂掉,错误如下

2021-01-31 23:04:23,677 ERROR
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Task did
not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within 180 + seconds.
at
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2021-01-31 23:04:23,685 ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within 180 + seconds.
at
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2021-01-31 23:04:23,686 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Stopping
TaskExecutor akka.tcp://flink@10.13.69.52:45901/user/rpc/taskmanager_0.
2021-01-31 23:04:23,686 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close
ResourceManager connection 1bd159f361d86e77d17e261ab44b5128.
2021-01-31 23:04:23,689 WARN  org.apache.flink.runtime.taskmanager.Task 
  
[] - Task 'Source: HiveSource-snmpprobe.p_port_traffic_5m ->
Calc(select=[binaryid AS id, ver, CAST(2021-01-31 21:45:00:TIMESTAMP(6)) AS
coltime, CAST(in_octets) AS in_octets, CAST(out_octets) AS out_octets,
CAST(bi_octets) AS bi_octets, CAST(unimax_octets) AS unimax_octets,
in_speed, out_speed, bi_speed, unimax_speed, in_util, out_util, bi_util,
unimax_util, inout_ratio, bandwidth, origin, CAST((() DATE_FORMAT
_UTF-16LE'-MM-dd HH:mm:ss')) AS crtime], where=[(coltime = 2021-01-31
21:45:00:TIMESTAMP(9))]) -> Sink:
Sink(table=[myhive.prod_mysql_zqzynetdb.p_port_traffic_5m], fields=[id, ver,
coltime, in_octets, out_octets, bi_octets, unimax_octets, in_speed,
out_speed, bi_speed, unimax_speed, in_util, out_util, bi_util, unimax_util,
inout_ratio, bandwidth, origin, crtime]) (1/1)#0' did not react to
cancelling signal for 30 seconds, but is stuck in method:
 java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107)
com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150)
com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180)
java.io.FilterInputStream.read(FilterInputStream.java:133)
com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)
com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)
com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52)
com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41)
com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54)
com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44)
com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:538)
com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:708)
com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:647)
com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:946)
com.mysql.cj.NativeSession.execSQL(NativeSession.java:1075)
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:930)
com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092)
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435)
com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了

2021-01-31 文章 tison
邮件列表不支持直接粘贴图片,请尝试使用附件或 gist 等方式共享。

Best,
tison.


hezongji...@qq.com  于2021年2月1日周一 上午9:28写道:

> 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
> 代码如下:
>
> 运行结果如下:
> --
> hezongji...@qq.com
>


Re: 请教 flink 1.12.1 插入 mysql 有时耗时很长

2021-01-31 文章 macdoor
打开了 debug 级别的日志,有这样的错误

2021-01-31 20:45:30,364 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager [] -
Released partition dc8a2804b6df6b0ceaee2610ccf6c6e5#312 produced by
448c5ac36dcda818f56ec5bbd728da10.
2021-01-31 20:45:30,392 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Free slot
with allocation id 80a1592c9e59efd80e412e7dee99f70c because: Stopping
JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31
20:30:00(d055754b88483b13648cc3fb32d9cd58).
2021-01-31 20:45:30,392 DEBUG
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot
TaskSlot(index:2, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1., taskHeapMemory=344.000mb
(360710140 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb
(268435460 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId:
80a1592c9e59efd80e412e7dee99f70c, jobId: d055754b88483b13648cc3fb32d9cd58).
org.apache.flink.util.FlinkException: Stopping JobMaster for job
ifXTable->p_port_traffic_5m @2021-01-31
20:30:00(d055754b88483b13648cc3fb32d9cd58).
at
org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:416)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:565)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:187)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.12.1.jar:1.12.1]
2021-01-31 20:45:30,393 DEBUG
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] -
Releasing local state under allocation id 80a1592c9e59efd80e412e7dee99f70c.
2021-01-31 20:45:30,393 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Free slot
with allocation id 4606a6194b4380efb5c2f95fc65bf01e because: Stopping
JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31
20:30:00(d055754b88483b13648cc3fb32d9cd58).
2021-01-31 20:45:30,393 DEBUG
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot
TaskSlot(index:12, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1., taskHeapMemory=344.000mb
(360710140 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb
(268435460 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId:
4606a6194b4380efb5c2f95fc65bf01e, jobId: d055754b88483b13648cc3fb32d9cd58).
org.apache.flink.util.FlinkException: Stopping JobMaster for job
ifXTable->p_port_traffic_5m @2021-01-31
20:30:00(d055754b88483b13648cc3fb32d9cd58).
at
org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:416)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:565)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at

为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了

2021-01-31 文章 hezongji...@qq.com
为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
代码如下:

运行结果如下:


hezongji...@qq.com


请教 flink 1.12.1 插入 mysql 有时耗时很长

2021-01-31 文章 macdoor
周期性batch mode 从 hive 提取数据插入 mysql,每批次 10K 到 20K 行数据,多数情况下
10-20秒可以完成,但不定期就会很长时间,能达到 20多分钟,但也能成功,查看了日志也看不到错误,检查 mysql 也没有发现锁表,怀疑 hive
metastore 的性能,但也没看出问题。

请教分析思路,从 flink 上能看出job 在等待什么吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:咨询求助

2021-01-31 文章 Appleyuchi
一点小小的建议哈,
目前flink社区讨论主要还是java/scala为主,
如果执意使用pyflink的话,后续极有可能会遇到较大的阻力.








在 2021-01-31 14:26:55,"瞿叶奇" <389243...@qq.com> 写道:
>您好,我是国网陕西采集系统开发人员,我们在架构改造中,准备使用pyflink 
>解决实时Kafka数据写HDFS的问题,我的Kafka集群存在kerberos安全认证,导致我现在还没连接上,能不能给一个样例呢?