请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-14 Thread tao wang
现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。

但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。

谢谢!!


Re: flink反压问题求助

2020-04-14 Thread Junzhong Qin
Hi, LakeShen
图太大了,回复的邮件没有投递成功,图加到附件了。
补充一下任务信息,目前此任务使用的是Flink-1.5。

LakeShen  于2020年4月13日周一 上午10:15写道:

> Hi Junzhong ,
>
> 图片没有显示,能否把图片重新上传一下。
>
> Best,
> LakeShen
>
> Junzhong Qin  于2020年4月11日周六 上午10:38写道:
>
> > 在跑Flink任务时,遇到了operator反压问题,任务执行图如下,source(读Kafka),
> >
> KeyBy(抽取数据字段供keyBy操作使用),Parser(业务处理逻辑),Sink(写Kafka),除了KeyBy->Parser使用hash(KeyBy操作)链接,其他都使用RESCALE链接。(并发度仅供参考,这个是解决问题后的并发度,最初的并发度为
> > 500->1000->3000->500)
> > [image: image.png]
> > 相关metric
> > [image: image.png]
> > [image: image.png]
> > 为了解决反压问题做的处理:
> > 1. 增大Parse并发,KeyByOpe.buffers.outPoolUsage 上升速率有减缓,多次加并发依然没有解决
> > 2. 优化Parse逻辑,减少CPU使用,效果不明显
> > 3. 将Parse里的一些数据过滤逻辑移到KeyBy operator里面,效果不明显
> > 最后猜测可能是KeyBy operator并发大和Parse链接hash操作占用NetWork资源过多导致反压,于是减少KeBy
> > operator的并发度,发现解决问题。但是想请教一下这个操作解决这个问题的具体原因。
> >
> > 谢谢!
> >
>


Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread Jiahui Jiang
Good to know! Thank you so much for all the responses again :)

From: Jark Wu 
Sent: Tuesday, April 14, 2020 10:51 PM
To: godfrey he 
Cc: Jiahui Jiang ; user 
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hi Jiahui,

Thanks for the inputs.
It's a very common scenario to set specific configuration on some dedicate 
operators (e.g. parallelism, join strategy).
And supporting query hints is definitely on our roadmap, but may happen in 1.12.
Support state ttl in query hints sounds reasonable to me.

Best,
Jark

On Wed, 15 Apr 2020 at 09:45, godfrey he 
mailto:godfre...@gmail.com>> wrote:
Hi Jiahui,

Thanks for your suggestions.
I think we may need more detailed explanation about the behavior change.
Regarding to "supporting query configuration using Hints", I think it's a one 
kind of approach, but we need more discussion.

Best,
Godfrey

Jiahui Jiang mailto:qzhzm173...@hotmail.com>> 
于2020年4月14日周二 下午7:46写道:
Yep yep :) I’m aware of the difference here for Blink and legacy Flink planner 
is only for sinks.

But since on the API level toDataStream doesn’t take in a query level config, 
so it’s easy for people to think they can’t control it on a per query basis 
without digging into the source code.

I have two questions / suggestions here:

1. Since StreamQueryConfig is deprecated and we want to consolidate config 
classes, can we maybe add an additional endpoint like .toRetractStream(Table, 
Class, minRetentionTime, maxRetentionTime)? Or at least add some Java docs so 
that I won’t worry about the behavior under the hook suddenly change?
2. What do we think about supporting query configuration using Hints to be a 
first class supported Flink feature?

Thank you so much 

From: godfrey he mailto:godfre...@gmail.com>>
Sent: Tuesday, April 14, 2020 3:20 AM
To: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Cc: Jark Wu mailto:imj...@gmail.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hi Jiahui,

I think this is the problem of multiple sinks optimization. If we optimize each 
sink eager (that means we optimize the query when we call `writeToSink` or 
`insertInto`), `TableConfig#setIdleStateRetentionTime` is functionally 
equivalent to QueryConfig.  which require we need call 
`TableConfig#setIdleStateRetentionTime` before call `writeToSink` or 
`insertInto`.  While, If we use multiple sinks optimization, It's hard to map 
the value of `TableConfig#setIdleStateRetentionTime` to each query. I think 
it's a common issue for configuring for per query on multiple sinks 
optimization.

but for `toRetractStream` method, we keep eager optimization strategy. So you 
can call `TableConfig#setIdleStateRetentionTime` before `toRetractStream`.

Best,
Godfrey

Jiahui Jiang mailto:qzhzm173...@hotmail.com>> 
于2020年4月14日周二 下午12:15写道:
Hey Godfrey, in some of the use cases our users have, they have a couple of 
complex join queries where the key domains key evolving - we definitely want 
some sort of state retention for those queries; but there are other where the 
key domain doesn't evolve overtime, but there isn't really a guarantee on 
what's the maximum gap between 2 records of the same key to appear in the 
stream, we don't want to accidentally invalidate the state for those keys in 
these streams.

Because of queries with different requirements can both exist in the pipeline, 
I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.

Just wondering, has similar requirement not come up much for SQL users before? 
(being able to set table / query configuration inside SQL queries)

We are also a little bit concerned because right now since 
'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the fact 
that TableConfig is read during toDataStream feels like relying on an 
implementation details that just happens to work, and there is no guarantee 
that it will keep working in the future versions...

Thanks!

From: godfrey he mailto:godfre...@gmail.com>>
Sent: Monday, April 13, 2020 9:51 PM
To: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Cc: Jark Wu mailto:imj...@gmail.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hi Jiahui,

Query hint is a way for fine-grained configuration.
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each 
operator?

Best,
Godfrey

Jiahui Jiang mailto:qzhzm173...@hotmail.com>> 
于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build 
their 

Re: 关于flink检查点

2020-04-14 Thread Congxian Qiu
hi

你说的间隔自适应是指什么呢?是指做 checkpoint 的间隔自动调整吗?

Best,
Congxian


half coke  于2020年4月15日周三 下午12:24写道:

> 请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?
>


回复:自定义具有Exactly-Once语义的sink

2020-04-14 Thread 阿华田
 谢谢




| |
王志华
|
|
邮箱:a15733178...@163.com
|

签名由 网易邮箱大师 定制

在2020年04月15日 11:44,zhang...@lakala.com 写道:
昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。
“Flink 端到端 Exactly-once 机制剖析”
https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g


发件人: 阿华田
发送时间: 2020-04-15 11:00
收件人: user-zh@flink.apache.org
主题: 自定义具有Exactly-Once语义的sink
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



关于flink检查点

2020-04-14 Thread half coke
请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?


Re: Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread Jark Wu
Hi,

我觉得你的需求是“使用系统时间关联维表变更日志”。
这种方式可以保证最低的延迟,而且能保持高吞吐。
不过这个功能目前还没有原生支持,Flink 1.11 会支持读取变更日志。但关联维表变更日志可能要等到1.12。

当前,可以通过 temporal table function join [1] 来满足需求,就是需要一定的开发量。需要你自己去将 mysql
binlog 数据(只能有 upsert 数据,不能有 delete)读进来构造成 Table。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#temporal-table-function



On Wed, 15 Apr 2020 at 12:07, wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
> 参考下这篇文章,里面有好多维度关联场景案例讲解!
>
> https://ververica.cn/developers/flink-datastream-associated-dimension-table-practice/
>
>
>
>
>
> 发件人: tingli ke
> 发送时间: 2020-04-15 11:22
> 收件人: user-zh
> 主题: Re: JDBCLookupFunction被缓存导致数据的不及时性
> 是否有其他的方式来对mysql维表数据进行实时join
>
>
> 13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
>
> > 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> > org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> > The cacheMaxSize is -1 means not use cache
> >
> >
> >
> > 13122260...@163.com
> >
> > 发件人: tingli ke
> > 发送时间: 2020-04-15 10:55
> > 收件人: user-zh
> > 主题: JDBCLookupFunction被缓存导致数据的不及时性
> > Hi,
> >
> >
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> > 是否有其他的方式来对mysql维表数据进行实时join
> >
>


Re: Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread wangweigu...@stevegame.cn

参考下这篇文章,里面有好多维度关联场景案例讲解!
https://ververica.cn/developers/flink-datastream-associated-dimension-table-practice/
 




 
发件人: tingli ke
发送时间: 2020-04-15 11:22
收件人: user-zh
主题: Re: JDBCLookupFunction被缓存导致数据的不及时性
是否有其他的方式来对mysql维表数据进行实时join
 
 
13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
 
> 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> The cacheMaxSize is -1 means not use cache
>
>
>
> 13122260...@163.com
>
> 发件人: tingli ke
> 发送时间: 2020-04-15 10:55
> 收件人: user-zh
> 主题: JDBCLookupFunction被缓存导致数据的不及时性
> Hi,
>
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> 是否有其他的方式来对mysql维表数据进行实时join
>


Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread Jark Wu
Hi Jiahui,

Thanks for the inputs.
It's a very common scenario to set specific configuration on some dedicate
operators (e.g. parallelism, join strategy).
And supporting query hints is definitely on our roadmap, but may happen in
1.12.
Support state ttl in query hints sounds reasonable to me.

Best,
Jark

On Wed, 15 Apr 2020 at 09:45, godfrey he  wrote:

> Hi Jiahui,
>
> Thanks for your suggestions.
> I think we may need more detailed explanation about the behavior change.
> Regarding to "supporting query configuration using Hints", I think it's a
> one kind of approach, but we need more discussion.
>
> Best,
> Godfrey
>
> Jiahui Jiang  于2020年4月14日周二 下午7:46写道:
>
>> Yep yep :) I’m aware of the difference here for Blink and legacy Flink
>> planner is only for sinks.
>>
>> But since on the API level toDataStream doesn’t take in a query level
>> config, so it’s easy for people to think they can’t control it on a per
>> query basis without digging into the source code.
>>
>> I have two questions / suggestions here:
>>
>> 1. Since StreamQueryConfig is deprecated and we want to consolidate
>> config classes, can we maybe add an additional endpoint like
>> .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at
>> least add some Java docs so that I won’t worry about the behavior under the
>> hook suddenly change?
>> 2. What do we think about supporting query configuration using Hints to
>> be a first class supported Flink feature?
>>
>> Thank you so much 
>> --
>> *From:* godfrey he 
>> *Sent:* Tuesday, April 14, 2020 3:20 AM
>> *To:* Jiahui Jiang 
>> *Cc:* Jark Wu ; user@flink.apache.org <
>> user@flink.apache.org>
>> *Subject:* Re: Setting different idleStateRetentionTime for different
>> queries executed in the same TableEnvironment in Flink 1.10
>>
>> Hi Jiahui,
>>
>> I think this is the problem of multiple sinks optimization. If we
>> optimize each sink eager (that means we optimize the query when we call
>> `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is
>> functionally equivalent to QueryConfig.  which require we need
>> call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or
>> `insertInto`.  While, If we use multiple sinks optimization, It's hard to
>> map the value of `TableConfig#setIdleStateRetentionTime` to each query. I
>> think it's a common issue for configuring for per query on multiple sinks
>> optimization.
>>
>> but for `toRetractStream` method, we keep eager optimization strategy. So
>> you can call `TableConfig#setIdleStateRetentionTime` before
>> `toRetractStream`.
>>
>> Best,
>> Godfrey
>>
>> Jiahui Jiang  于2020年4月14日周二 下午12:15写道:
>>
>> Hey Godfrey, in some of the use cases our users have, they have a couple
>> of complex join queries where the key domains key evolving - we definitely
>> want some sort of state retention for those queries; but there are other
>> where the key domain doesn't evolve overtime, but there isn't really a
>> guarantee on what's the maximum gap between 2 records of the same key to
>> appear in the stream, we don't want to accidentally invalidate the state
>> for those keys in these streams.
>>
>> Because of queries with different requirements can both exist in the
>> pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per
>> operator.
>>
>> Just wondering, has similar requirement not come up much for SQL users
>> before? (being able to set table / query configuration inside SQL queries)
>>
>> We are also a little bit concerned because right now since
>> 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the
>> fact that TableConfig is read during toDataStream feels like relying on an
>> implementation details that just happens to work, and there is no guarantee
>> that it will keep working in the future versions...
>>
>> Thanks!
>> --
>> *From:* godfrey he 
>> *Sent:* Monday, April 13, 2020 9:51 PM
>> *To:* Jiahui Jiang 
>> *Cc:* Jark Wu ; user@flink.apache.org <
>> user@flink.apache.org>
>> *Subject:* Re: Setting different idleStateRetentionTime for different
>> queries executed in the same TableEnvironment in Flink 1.10
>>
>> Hi Jiahui,
>>
>> Query hint is a way for fine-grained configuration.
>>  just out of curiosity, is it a strong requirement
>>  that users need to config different IDLE_STATE_RETENTION_TIME for each
>> operator?
>>
>> Best,
>> Godfrey
>>
>> Jiahui Jiang  于2020年4月14日周二 上午2:07写道:
>>
>> Also for some more context, we are building a framework to help users
>> build their Flink pipeline with SQL. Our framework handles all the setup
>> and configuration, so that users only need to write the SQL queries without
>> having to have any Flink knowledge.
>>
>> One issue we encountered was, for some of the streams, the key domain
>> keeps evolving and we want to expire the states for older keys. But there
>> is no easy ways to allow users configure their state timeout directly
>> through SQL APIs.
>> 

Re: 自定义具有Exactly-Once语义的sink

2020-04-14 Thread zhang...@lakala.com
昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。
“Flink 端到端 Exactly-once 机制剖析”
https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g

 
发件人: 阿华田
发送时间: 2020-04-15 11:00
收件人: user-zh@flink.apache.org
主题: 自定义具有Exactly-Once语义的sink
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
 


Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread Zhenghua Gao
有两个参数可以控制cache大小和cache失效时间 [1],你可以在性能和准确性上做权衡

  -- lookup options, optional, used in temporary join
  'connector.lookup.cache.max-rows' = '5000', -- optional, max number
of rows of lookup cache, over this value, the oldest rows will
  -- be eliminated.
"cache.max-rows" and "cache.ttl" options must all be specified if any
  -- of them is specified.
Cache is not enabled as default.
  'connector.lookup.cache.ttl' = '10s', -- optional, the max time to
live for each rows in lookup cache, over this time, the oldest rows
-- will be expired.
"cache.max-rows" and "cache.ttl" options must all be specified if any
of
-- them is specified. Cache is
not enabled as default.
  'connector.lookup.max-retries' = '3', -- optional, max retry times
if lookup database failed

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector

*Best Regards,*
*Zhenghua Gao*


On Wed, Apr 15, 2020 at 11:28 AM Dino Zhang 
wrote:

> 可以考虑调小cache.ttl
>
> On Wed, Apr 15, 2020 at 11:22 AM tingli ke  wrote:
>
> > 是否有其他的方式来对mysql维表数据进行实时join
> >
> >
> > 13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
> >
> > > 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> > > org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> > > The cacheMaxSize is -1 means not use cache
> > >
> > >
> > >
> > > 13122260...@163.com
> > >
> > > 发件人: tingli ke
> > > 发送时间: 2020-04-15 10:55
> > > 收件人: user-zh
> > > 主题: JDBCLookupFunction被缓存导致数据的不及时性
> > > Hi,
> > >
> > >
> >
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> > > 是否有其他的方式来对mysql维表数据进行实时join
> > >
> >
>
>
> --
> Regards,
> DinoZhang
>


Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread Dino Zhang
可以考虑调小cache.ttl

On Wed, Apr 15, 2020 at 11:22 AM tingli ke  wrote:

> 是否有其他的方式来对mysql维表数据进行实时join
>
>
> 13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
>
> > 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> > org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> > The cacheMaxSize is -1 means not use cache
> >
> >
> >
> > 13122260...@163.com
> >
> > 发件人: tingli ke
> > 发送时间: 2020-04-15 10:55
> > 收件人: user-zh
> > 主题: JDBCLookupFunction被缓存导致数据的不及时性
> > Hi,
> >
> >
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> > 是否有其他的方式来对mysql维表数据进行实时join
> >
>


-- 
Regards,
DinoZhang


Re: 自定义具有Exactly-Once语义的sink

2020-04-14 Thread jinhai wang
FlinkKafkaProducer、StreamingFileSink的实现都支持Exactly-Once,可以研究下


Best Regards

jinhai...@gmail.com

> 2020年4月15日 上午11:00,阿华田  写道:
> 
> 如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
> 



????????????????Exactly-Once??????sink

2020-04-14 Thread 1193216154
Flink  FlinkKafkaProducer??




----
??:"??"

Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread tingli ke
是否有其他的方式来对mysql维表数据进行实时join


13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:

> 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> The cacheMaxSize is -1 means not use cache
>
>
>
> 13122260...@163.com
>
> 发件人: tingli ke
> 发送时间: 2020-04-15 10:55
> 收件人: user-zh
> 主题: JDBCLookupFunction被缓存导致数据的不及时性
> Hi,
>
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> 是否有其他的方式来对mysql维表数据进行实时join
>


Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread tingli ke
您好,不使用cache会导致每个记录都要查一次mysql,效率很低效

13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:

> 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> The cacheMaxSize is -1 means not use cache
>
>
>
> 13122260...@163.com
>
> 发件人: tingli ke
> 发送时间: 2020-04-15 10:55
> 收件人: user-zh
> 主题: JDBCLookupFunction被缓存导致数据的不及时性
> Hi,
>
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> 是否有其他的方式来对mysql维表数据进行实时join
>


Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread 13122260...@163.com
有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
The cacheMaxSize is -1 means not use cache



13122260...@163.com
 
发件人: tingli ke
发送时间: 2020-04-15 10:55
收件人: user-zh
主题: JDBCLookupFunction被缓存导致数据的不及时性
Hi,
流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
是否有其他的方式来对mysql维表数据进行实时join


自定义具有Exactly-Once语义的sink

2020-04-14 Thread 阿华田
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread tingli ke
Hi,
流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
是否有其他的方式来对mysql维表数据进行实时join


Re: sub

2020-04-14 Thread Sivaprasanna
Hi,

To subscribe, you have to send a mail to user-subscr...@flink.apache.org

On Wed, 15 Apr 2020 at 7:33 AM, lamber-ken  wrote:

> user@flink.apache.org
>


sub

2020-04-14 Thread lamber-ken
user@flink.apache.org

Flink

2020-04-14 Thread Navneeth Krishnan
Hi All,

I'm very new to EKS and trying to deploy a flink job in cluster mode. Are
there any good documentations on what are the steps to deploy on EKS?

>From my understanding, with flink 1.10 running it on EKS will automatically
scale up and down with kubernetes integration based on the load. Is this
correct? Do I have to do enable some configs to support this feature?

How to use the lyft k8s operator when deploying on EKS?

Thanks a lot, appreciate all the help.


Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread godfrey he
Hi Jiahui,

Thanks for your suggestions.
I think we may need more detailed explanation about the behavior change.
Regarding to "supporting query configuration using Hints", I think it's a
one kind of approach, but we need more discussion.

Best,
Godfrey

Jiahui Jiang  于2020年4月14日周二 下午7:46写道:

> Yep yep :) I’m aware of the difference here for Blink and legacy Flink
> planner is only for sinks.
>
> But since on the API level toDataStream doesn’t take in a query level
> config, so it’s easy for people to think they can’t control it on a per
> query basis without digging into the source code.
>
> I have two questions / suggestions here:
>
> 1. Since StreamQueryConfig is deprecated and we want to consolidate config
> classes, can we maybe add an additional endpoint like
> .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at
> least add some Java docs so that I won’t worry about the behavior under the
> hook suddenly change?
> 2. What do we think about supporting query configuration using Hints to be
> a first class supported Flink feature?
>
> Thank you so much 
> --
> *From:* godfrey he 
> *Sent:* Tuesday, April 14, 2020 3:20 AM
> *To:* Jiahui Jiang 
> *Cc:* Jark Wu ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> I think this is the problem of multiple sinks optimization. If we optimize
> each sink eager (that means we optimize the query when we call
> `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is
> functionally equivalent to QueryConfig.  which require we need
> call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or
> `insertInto`.  While, If we use multiple sinks optimization, It's hard to
> map the value of `TableConfig#setIdleStateRetentionTime` to each query. I
> think it's a common issue for configuring for per query on multiple sinks
> optimization.
>
> but for `toRetractStream` method, we keep eager optimization strategy. So
> you can call `TableConfig#setIdleStateRetentionTime` before
> `toRetractStream`.
>
> Best,
> Godfrey
>
> Jiahui Jiang  于2020年4月14日周二 下午12:15写道:
>
> Hey Godfrey, in some of the use cases our users have, they have a couple
> of complex join queries where the key domains key evolving - we definitely
> want some sort of state retention for those queries; but there are other
> where the key domain doesn't evolve overtime, but there isn't really a
> guarantee on what's the maximum gap between 2 records of the same key to
> appear in the stream, we don't want to accidentally invalidate the state
> for those keys in these streams.
>
> Because of queries with different requirements can both exist in the
> pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per
> operator.
>
> Just wondering, has similar requirement not come up much for SQL users
> before? (being able to set table / query configuration inside SQL queries)
>
> We are also a little bit concerned because right now since
> 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the
> fact that TableConfig is read during toDataStream feels like relying on an
> implementation details that just happens to work, and there is no guarantee
> that it will keep working in the future versions...
>
> Thanks!
> --
> *From:* godfrey he 
> *Sent:* Monday, April 13, 2020 9:51 PM
> *To:* Jiahui Jiang 
> *Cc:* Jark Wu ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> Query hint is a way for fine-grained configuration.
>  just out of curiosity, is it a strong requirement
>  that users need to config different IDLE_STATE_RETENTION_TIME for each
> operator?
>
> Best,
> Godfrey
>
> Jiahui Jiang  于2020年4月14日周二 上午2:07写道:
>
> Also for some more context, we are building a framework to help users
> build their Flink pipeline with SQL. Our framework handles all the setup
> and configuration, so that users only need to write the SQL queries without
> having to have any Flink knowledge.
>
> One issue we encountered was, for some of the streams, the key domain
> keeps evolving and we want to expire the states for older keys. But there
> is no easy ways to allow users configure their state timeout directly
> through SQL APIs.
> Currently we are asking users to configure idleStateRetentionTime in a
> custom SQL hint, then our framework will parse it and set it up during
> table registration time.
>
> An example query that users can be writing right now looks like,
>
> *CREATE TABLE *`/output` *AS*
>
> *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */
> * *
>
> *FROM * `/input1` a
>
> INNER JOIN `/input2` b
>
> ON *a.column_name *=* b.column_name*;
>
> Is this something Flink SQL 

Checkpoints for kafka source sometimes get 55 GB size (instead of 2 MB) and flink job fails during restoring from such checkpoint

2020-04-14 Thread Oleg Vysotsky
Hello,

Sometime our flink job starts creating large checkpoints which include 55 Gb 
(instead of 2 MB) related to kafka source. After the flink job creates first 
“abnormal” checkpoint all next checkpoints are “abnormal” as well. Flink job 
can’t be restored from such checkpoint. Restoring from the checkpoint 
hangs/fails. Also flnk dashboard hangs and flink cluster crashs during the 
restoring from such checkpoint.  We  didn’t catch related error message.  Also 
we don’t find clear way to reproduce this problem (when the flink job creates 
“abnormal” checkpoints).

Configuration:
We are using flink 1.8.1 on emr (emr 5.27)
Kafka: confluence kafka 5.4.1
Flink kafka connector:  org.apache.flink:flink-connector-kafka_2.11:1.8.1 (it 
includes org.apache.kafka:kafka-clients:2.0.1 dependencies)

Our input kafka topic has 32 partitions and related flink source has 32 
parallelism
We use pretty much all default flink kafka concumer setting. We only specified:
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
ConsumerConfig.GROUP_ID_CONFIG,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG

Thanks a lot  in advance!
Oleg




Re: Flink job didn't restart when a task failed

2020-04-14 Thread Hanson, Bruce
Hi Zhu Zhu (and Till),

Thanks for your thoughts on this problem. I do not see a message like the one 
you mention "Task {} is already in state FAILED." I have attached a file with 
all the task manager logs that we received at the time this happened. As you 
see, there aren’t many. We turned on debug logging for “org.apache.flink” on 
this job this afternoon so maybe we’ll find something interesting if/when the 
issue happens again. I do hope we can catch it in the act.

-Bruce

--


From: Zhu Zhu 
Date: Monday, April 13, 2020 at 9:29 PM
To: Till Rohrmann 
Cc: Aljoscha Krettek , user , Gary 
Yao 
Subject: Re: Flink job didn't restart when a task failed

Sorry for not following this ML earlier.

I think the cause might be that the final state ('FAILED') update message to JM 
is lost. TaskExecutor will simply fail the task (which does not take effect in 
this case since the task is already FAILED) and will not update the task state 
again in this case.
@Bruce would you take a look at the TM log? If the guess is right, in task 
manager logs there will be one line "Task {} is already in state FAILED."

Thanks,
Zhu Zhu

Till Rohrmann mailto:trohrm...@apache.org>> 于2020年4月10日周五 
上午12:51写道:
For future reference, here is the issue to track the reconciliation logic [1].

[1] 
https://issues.apache.org/jira/browse/FLINK-17075

Cheers,
Till

On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Bruce,

what you are describing sounds indeed quite bad. Quite hard to say whether we 
fixed such an issue in 1.10. It is definitely worth a try to upgrade, though.

In order to further debug the problem, it would be really great if you could 
provide us with the log files of the JobMaster and the TaskExecutor. Ideally on 
debug log level if you have them.

One thing which we wanted to add is sending the current task statuses as part 
of the heartbeat from the TM to the JM. Having this information would allow us 
to reconcile a situation like you are describing.

Cheers,
Till

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:
> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. 
> One task encountered a failure and switched to FAILED (see the full exception 
> below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] 
> level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - 
> Un-registering task and sending final execution state FAILED to JobManager 
> for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect 
> with this type of failure that the Job Manager would restart the job. In this 
> case, the job carried on, hobbled, until the it stopped processing data and 
> our user had to manually restart the job. The job also started experiencing 
> checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have 
> been ok as the job had an appropriate restart strategy in place. The Task 
> Manager that this task was running on remained healthy and was actively 
> processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen 
> before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed 
> environment where users can run their jobs, and are in the process of 
> upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  
> org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) 
> (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>  Connection timed out (connection to 
> '/100.112.98.121:36256')
> at 
> 

Re:Re: 关于flink run -m yarn提交失败。flink1.9

2020-04-14 Thread guanyq
使用的是perjob模式提交作业,没有使用yarn-seesion。为什么perjob模式提交有这个-yd参数会有问题,还是没太懂。
在 2020-04-15 08:52:11,"tison"  写道:
>-yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说
>
>with -yd 以 perjob 模式提交作业,即启动一个新集群
>without -yd 提交到一个现有的 Flink on YARN 集群
>
>哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?
>
>Best,
>tison.
>
>
>guanyq  于2020年4月15日周三 上午8:46写道:
>
>> 提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
>> At 2020-04-14 15:31:00, "guanyq"  wrote:
>> >提交失败,yarn资源也还有很多,为什么会提交失败呢?
>> >
>> >提交脚本
>> >./bin/flink run -m yarn-cluster \
>> >-ynm TestDataProcess \
>> >-yd \
>> >-yn 2 \
>> >-ytm 1024 \
>> >-yjm 1024 \
>> >-c com.data.processing.unconditionalacceptance.TestDataProcess \
>> >./tasks/UnconditionalAcceptanceDataProcess.jar \
>> >
>> >
>> >yarn资源
>> >Apps Submitted Apps PendingApps RunningApps Completed  Containers
>> Running  Memory Used Memory TotalMemory Reserved VCores Used
>>  VCores TotalVCores Reserved Active NodesDecommissioned Nodes
>> Lost Nodes  Unhealthy Nodes Rebooted Nodes
>> >2390   12  227 173 334 GB  1.42 TB 0 B 173
>>  288 0   9   0   0   0   0
>> >
>> >
>> >
>> >2020-04-14 15:14:19,002 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:19,253 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:19,504 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:19,755 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:20,006 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:20,257 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:20,508 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:20,759 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:21,011 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:21,262 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:21,513 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:21,764 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:22,015 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:22,265 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:22,517 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:22,768 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:23,019 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. Please check if the requested resources are
>> available in the YARN cluster
>> >2020-04-14 15:14:23,270 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>> took more than 60 seconds. 

Re: 关于flink run -m yarn提交失败。flink1.9

2020-04-14 Thread tison
-yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说

with -yd 以 perjob 模式提交作业,即启动一个新集群
without -yd 提交到一个现有的 Flink on YARN 集群

哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?

Best,
tison.


guanyq  于2020年4月15日周三 上午8:46写道:

> 提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
> At 2020-04-14 15:31:00, "guanyq"  wrote:
> >提交失败,yarn资源也还有很多,为什么会提交失败呢?
> >
> >提交脚本
> >./bin/flink run -m yarn-cluster \
> >-ynm TestDataProcess \
> >-yd \
> >-yn 2 \
> >-ytm 1024 \
> >-yjm 1024 \
> >-c com.data.processing.unconditionalacceptance.TestDataProcess \
> >./tasks/UnconditionalAcceptanceDataProcess.jar \
> >
> >
> >yarn资源
> >Apps Submitted Apps PendingApps RunningApps Completed  Containers
> Running  Memory Used Memory TotalMemory Reserved VCores Used
>  VCores TotalVCores Reserved Active NodesDecommissioned Nodes
> Lost Nodes  Unhealthy Nodes Rebooted Nodes
> >2390   12  227 173 334 GB  1.42 TB 0 B 173
>  288 0   9   0   0   0   0
> >
> >
> >
> >2020-04-14 15:14:19,002 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,253 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,504 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,755 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,006 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,257 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,508 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,759 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,011 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,262 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,513 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,764 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,015 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,265 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,517 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,768 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:23,019 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:23,270 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:23,521 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. 

Re:关于flink run -m yarn提交失败。flink1.9

2020-04-14 Thread guanyq
提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
At 2020-04-14 15:31:00, "guanyq"  wrote:
>提交失败,yarn资源也还有很多,为什么会提交失败呢?
>
>提交脚本
>./bin/flink run -m yarn-cluster \
>-ynm TestDataProcess \
>-yd \
>-yn 2 \
>-ytm 1024 \
>-yjm 1024 \
>-c com.data.processing.unconditionalacceptance.TestDataProcess \
>./tasks/UnconditionalAcceptanceDataProcess.jar \
>
>
>yarn资源
>Apps Submitted Apps PendingApps RunningApps Completed  Containers 
>Running  Memory Used Memory TotalMemory Reserved VCores Used 
>VCores TotalVCores Reserved Active NodesDecommissioned NodesLost 
>Nodes  Unhealthy Nodes Rebooted Nodes
>2390   12  227 173 334 GB  1.42 TB 0 B 173 288 
>0   9   0   0   0   0
>
>
>
>2020-04-14 15:14:19,002 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:19,253 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:19,504 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:19,755 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:20,006 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:20,257 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:20,508 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:20,759 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:21,011 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:21,262 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:21,513 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:21,764 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:22,015 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:22,265 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:22,517 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:22,768 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:23,019 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:23,270 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:23,521 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 
>available in the YARN cluster
>2020-04-14 15:14:23,772 INFO  
>org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment 
>took more than 60 seconds. Please check if the requested resources are 

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-14 Thread Kaan Sancak
Thanks for the useful information! It seems like a good and fun idea to 
experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the 
Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or 
Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and 
obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are 
happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

> On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi,
> 
> As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
> work primarily on static graphs. I don't think it'll be possible to
> implement incremental algorithms described in your SO question.
> 
> Have you tried looking at Stateful Functions, a recent new API added to
> Flink?
> It supports arbitrary messaging between functions, which may allow you to
> build what you have in mind.
> Take a look at Seth's an Igal's comments here [1], where there seems to be a
> similar incremental graph-processing use case for sessionization.
> 
> Cheers,
> Gordon
> 
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Processing Message after emitting to Sink

2020-04-14 Thread KristoffSC
Hi all,
I have a special use case that I'm not sure how I can fulfill.

The use case is:
I have my main business processing pipe line that has a MQ source,
processFunction1, processFunction2  and MQ sink

PocessFunction1 apart from processing the main business message is also
emitting some side effects using side outputs. Those side outputs are send
to SideOutputMqSink that sends them to the queue.

The requirement is that PocessFunction1 must not send out the main business
message further to processFunction2 until side output from processFunction1
is send to the queue via SideOutputMqSink.

In general I don't have to use side outputs, although I do some extra
processing on them before sending to the sink so having sideOutput stream is
nice to have. Never the less, the key requirement is that we should wait
with further processing until side utput is send to the queue. 

I could achieve it in a way that my processFunction1 in processElement
method will call MQ directly before sending out the main message, although I
dot like that idea. 

I was thinking is there a way to have a Sink function that would be also a
FlatMap function?

The best solution would be to be able to process two streams (main and side
effect) in some nice way but with some barrier, so the main pipeline will
wait until side output is send. 
Both streams can be keyed. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Upgrading Flink

2020-04-14 Thread Chesnay Schepler
The only guarantee that Flink provides is that any /jar/ working against 
Public API's will continue to work without recompilation.


There are no compatibility guarantees between clients<->server of 
different versions.


On 14/04/2020 20:02, David Anderson wrote:
@Chesnay Flink doesn't seem to guarantee client-jobmanager 
compability, even for bug-fix releases. For example, some jobs 
compiled with 1.9.0 don't work with a cluster running 1.9.2. See 
https://github.com/ververica/sql-training/issues/8#issuecomment-590966210 for 
an example of a case when recompiling was necessary.


Does the Flink project have an explicit policy as to when recompiling 
can be required?



On Tue, Apr 14, 2020 at 2:38 PM Sivaprasanna 
mailto:sivaprasanna...@gmail.com>> wrote:


Ideally if the underlying cluster where the job is being deployed
changes (1.8.x to 1.10.x ), it is better to update your project
dependencies to the new version (1.10.x), and hence you need to
recompile the jobs.

On Tue, Apr 14, 2020 at 3:29 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

@Robert Why would he have to recompile the jobs? Shouldn't he
be fine soo long as he isn't using any API for which we broke
binary-compatibility?

On 09/04/2020 09:55, Robert Metzger wrote:

Hey Stephen,

1. You should be able to migrate from 1.8 to 1.10:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html#compatibility-table

2. Yes, you need to recompile (but ideally you don't need to
change anything).



On Mon, Apr 6, 2020 at 10:19 AM Stephen Connolly
mailto:stephen.alan.conno...@gmail.com>> wrote:

Quick questions on upgrading Flink.

All our jobs are compiled against Flink 1.8.x

We are planning to upgrade to 1.10.x

1. Is the recommended path to upgrade one minor at a
time, i.e. 1.8.x -> 1.9.x and then 1.9.x -> 1.10.x as a
second step or is the big jump supported, i.e. 1.8.x ->
1.10.x in one change

2. Do we need to recompile the jobs against the newer
Flink version before upgrading? Coordinating multiple
teams can be tricky, so - short of spinning up a second
flink cluster - our continuous deployment infrastructure
will try to deploy the topologies compiled against 1.8.x
for an hour or two after we have upgraded the cluster







Re: [Stateful Functions] Using Flink CEP

2020-04-14 Thread Oytun Tez
Hi Igal,

I have use cases such as "when a translator translates 10 words within 30
seconds". Typically, it is beautiful to express these with CEP.

Yet, these are exploration questions where I try to replicate our Flink
application in Statefun. Rephrasing problems better might be what's needed
to fit our problems within Statefun (e.g., from a CEP-like expression to
the ridesharing example you shared)

Oytun


 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


On Tue, Apr 14, 2020 at 5:41 AM Igal Shilman  wrote:

> Hi,
>
> I'm not familiar with the other library that you have mentioned, and
> indeed using Flink CEP from within a stateful function is not possible
> within a single Flink job,  as Gordon mentioned.
>
> I'm wondering what aspects of CEP are you interested in?
> Because essentially a stateful function can be considered as a state
> machine with auxiliary state.
> You can take a look at the ride sharing example [1] for instance, where
> the drivers, and the rides are cooperative state machines.
>
> [1] -
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing
>
> Good luck!
> Igal.
>
>
> On Tue, Apr 14, 2020 at 5:07 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi!
>>
>> It isn't possible to use Flink CEP within Stateful Functions.
>>
>> That could be an interesting primitive, to add CEP-based function
>> constructs.
>> Could your briefly describe what you are trying to achieve?
>>
>> On the other hand, there are plans to integrate Stateful Functions more
>> closely with the Flink APIs.
>> One direction we've been thinking about is to, for example, support Flink
>> DataStreams as StateFun ingress / egresses. In this case, you'll be able
>> to
>> use Flink CEP to detect patterns, and use the results as an ingress which
>> invokes functions within a StateFun app.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: [Stateful Functions] Using Flink CEP

2020-04-14 Thread Oytun Tez
Hi Gordon,

Getting a little closer to Flink API could be helpful here with
integration. DataStreams as ingress/egress would be AMAZING. Deploying
regular Flink API code and statefun together as a single job is also
something I will explore soon.

With CEP, I simply want to keep a Function-specific pattern engine. I could
use a library like Siddhi and try to persist its state via Function's
PersistValue or something.

Also, one thing to consider in Flink API integration is protobuf
enforcement in statefun. We had to use protobuf within the statefun code,
but nowhere else we are using protobuf. Even if we talk via DataStreams,
this would still bring us some level of separation between our Flink API
code and statefun.


 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


On Mon, Apr 13, 2020 at 11:07 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> It isn't possible to use Flink CEP within Stateful Functions.
>
> That could be an interesting primitive, to add CEP-based function
> constructs.
> Could your briefly describe what you are trying to achieve?
>
> On the other hand, there are plans to integrate Stateful Functions more
> closely with the Flink APIs.
> One direction we've been thinking about is to, for example, support Flink
> DataStreams as StateFun ingress / egresses. In this case, you'll be able to
> use Flink CEP to detect patterns, and use the results as an ingress which
> invokes functions within a StateFun app.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Upgrading Flink

2020-04-14 Thread David Anderson
@Chesnay Flink doesn't seem to guarantee client-jobmanager compability,
even for bug-fix releases. For example, some jobs compiled with 1.9.0 don't
work with a cluster running 1.9.2. See
https://github.com/ververica/sql-training/issues/8#issuecomment-590966210 for
an example of a case when recompiling was necessary.

Does the Flink project have an explicit policy as to when recompiling can
be required?


On Tue, Apr 14, 2020 at 2:38 PM Sivaprasanna 
wrote:

> Ideally if the underlying cluster where the job is being deployed changes
> (1.8.x to 1.10.x ), it is better to update your project dependencies to the
> new version (1.10.x), and hence you need to recompile the jobs.
>
> On Tue, Apr 14, 2020 at 3:29 PM Chesnay Schepler 
> wrote:
>
>> @Robert Why would he have to recompile the jobs? Shouldn't he be fine soo
>> long as he isn't using any API for which we broke binary-compatibility?
>>
>> On 09/04/2020 09:55, Robert Metzger wrote:
>>
>> Hey Stephen,
>>
>> 1. You should be able to migrate from 1.8 to 1.10:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html#compatibility-table
>>
>> 2. Yes, you need to recompile (but ideally you don't need to change
>> anything).
>>
>>
>>
>> On Mon, Apr 6, 2020 at 10:19 AM Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> Quick questions on upgrading Flink.
>>>
>>> All our jobs are compiled against Flink 1.8.x
>>>
>>> We are planning to upgrade to 1.10.x
>>>
>>> 1. Is the recommended path to upgrade one minor at a time, i.e. 1.8.x ->
>>> 1.9.x and then 1.9.x -> 1.10.x as a second step or is the big jump
>>> supported, i.e. 1.8.x -> 1.10.x in one change
>>>
>>> 2. Do we need to recompile the jobs against the newer Flink version
>>> before upgrading? Coordinating multiple teams can be tricky, so - short of
>>> spinning up a second flink cluster - our continuous deployment
>>> infrastructure will try to deploy the topologies compiled against 1.8.x for
>>> an hour or two after we have upgraded the cluster
>>>
>>
>>


post-checkpoint watermark out of sync with event stream?

2020-04-14 Thread Cliff Resnick
We have an event-time pipeline that uses a ProcessFunction to accept events
with an allowed lateness of a number of days. We a
BoundedOutOfOrdernessTimestampExtractor and our event stream has a long
tail that occasionally exceeds our allowed lateness, in which case we drop
the events.

The logic is simple:
1. OnElement, we compare the (element's event time + allowed lateness)
against the current watermark
2. If element is within time bound bounds we register a timer for
(element's event time + allowed lateness). We call this "endTime".
3. during the above window we collect and assimilate all the data for the
key and regularly UPSERT the data to a data store.
4. OnTimer for above "endTime" we clear state for the key.

The above has worked well for the past 1-2 years. Last week, however, we
had a bug that introduced DEBUG logging to the job config, and this caused
several failure/restarts (S3 DEBUG logging is extremely verbose!). Within a
day or two, our monitoring system restarted the pipeline several times,
sometimes from a Savepoint over an hour or two old. For some reason during
this period we noticed that some few long tail data that should have been
dropped made it into our data store. These data did not contain assimilated
Flink state, meaning they passed through after the endTime key purge (4.)
and ended up compromising the data store by replacing assimilated  with
tail-end values.

I'm wondering how this could be possible. The only explanation I can think
of is:

4. on "endTime" timer key state is purged.
5 --- job fail ---
6.  job restarted on 2.5 hour old Savepoint
7.  watermark regresses (?) from "endTime" watermark.
8. a long tail event squeaks through under temporarily backdated watermark
9. data store data for key is replaced with long tail data,

Is the above possible, or perhaps there is another possible scenario? Any
opinions appreciated!

-Cliff


Re: [1.10.0] flink-dist source jar is empty

2020-04-14 Thread Steven Wu
Chesnay, sorry it was my mistake. yes, we did have a local change of for
the shade plugin that I missed when porting local changes from 1.9 to 1.10.

true

On Tue, Apr 14, 2020 at 6:29 AM Chesnay Schepler  wrote:

> I just built the 1.8 and 1.9 flink-dist jars and neither contain the
> sources of any bundled modules.
>
> How were you building the jars, and were you making any modifications to
> the Flink source?
>
> On 14/04/2020 15:07, Steven Wu wrote:
>
> flink-dist is a uber/shadow jar. before 1.10, its source jar contains the
> source files for the flink modules that it bundles.
>
> On Tue, Apr 14, 2020 at 1:34 AM Chesnay Schepler 
> wrote:
>
>> That should not be a problem since the flink-dist module does not
>> contain any java sources
>>
>> On 14/04/2020 06:42, Steven Wu wrote:
>> >
>> > We build and publish flink-dist locally. But the source jar turns out
>> > empty. Other source jars (like flink-core) are good. Anyone else
>> > experienced similar problem?
>> >
>> > Thanks,
>> > Steven
>>
>>
>>
>


Re: Registering UDAF in blink batch app

2020-04-14 Thread godfrey he
Hi Dmytro,

Currently, TableEnvironment does not support register AggregationFunction
and TableFunction, because type extractor has not been unified for Java and
Scala.

One approach is we can use "TableEnvironment#createFunction" which will
register UDF to catalog.
I find "createTemporarySystemFunction" does not work now. cc @Zhenghua Gao


Best,
Godfrey

Zhenghua Gao  于2020年4月14日周二 下午6:40写道:

> `StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl`
> object,
> which has several `registerFunction` interface for
> ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.
>
> `TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which
> is a unify entry point for Table/SQL programs.
> And it only has a deprecated `registerFunction` interface for
> ScalarFunction.  You should use `createTemporarySystemFunction` instead.
>
> A workaround for batch mode of blink planner is: You can use the public
> constructor of `StreamTableEnvironmentImpl` to create
> the TableEnvironment and use `registerFunction`s. Pls make sure you pass
> in the correct `isStreamingMode = false`
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan 
> wrote:
>
>> Hi All,
>>
>>
>>
>> Could you please tell how to register custom Aggregation function in
>> blink batch app?
>>
>> In case of streaming mode:
>>
>> We create
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.*newInstance*().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.*create*(env, 
>> bsSettings);
>>
>>
>>
>> Which has:
>>
>>  void registerFunction(String name, AggregateFunction
>> aggregateFunction);
>>
>>
>>
>> But in case of batchMode, we need to create TableEnvironment:
>>
>>
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.*newInstance*().useBlinkPlanner().inBatchMode().build();
>> tEnv = TableEnvironment.*create*(bsSettings);
>>
>>
>>
>> Which does not have this function to register AggregationFunction, only
>> Scalar one.
>>
>>
>>
>> Details: Flink 1.10, Java API
>>
>>
>>
>>
>>
>


Re: [1.10.0] flink-dist source jar is empty

2020-04-14 Thread Chesnay Schepler
I just built the 1.8 and 1.9 flink-dist jars and neither contain the 
sources of any bundled modules.


How were you building the jars, and were you making any modifications to 
the Flink source?


On 14/04/2020 15:07, Steven Wu wrote:
flink-dist is a uber/shadow jar. before 1.10, its source jar contains 
the source files for the flink modules that it bundles.


On Tue, Apr 14, 2020 at 1:34 AM Chesnay Schepler > wrote:


That should not be a problem since the flink-dist module does not
contain any java sources

On 14/04/2020 06:42, Steven Wu wrote:
>
> We build and publish flink-dist locally. But the source jar
turns out
> empty. Other source jars (like flink-core) are good. Anyone else
> experienced similar problem?
>
> Thanks,
> Steven






Re: [1.10.0] flink-dist source jar is empty

2020-04-14 Thread Steven Wu
flink-dist is a uber/shadow jar. before 1.10, its source jar contains the
source files for the flink modules that it bundles.

On Tue, Apr 14, 2020 at 1:34 AM Chesnay Schepler  wrote:

> That should not be a problem since the flink-dist module does not
> contain any java sources
>
> On 14/04/2020 06:42, Steven Wu wrote:
> >
> > We build and publish flink-dist locally. But the source jar turns out
> > empty. Other source jars (like flink-core) are good. Anyone else
> > experienced similar problem?
> >
> > Thanks,
> > Steven
>
>
>


Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-14 Thread Flavio Pompermaier
>From what I see Gelly is not really maintained or used anymore..do you
think it could make sense to deprecate it and write a guide (on the
documentation) about how to rewrite a Gelly app into a Statefun one?

On Tue, Apr 14, 2020 at 5:16 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
> work primarily on static graphs. I don't think it'll be possible to
> implement incremental algorithms described in your SO question.
>
> Have you tried looking at Stateful Functions, a recent new API added to
> Flink?
> It supports arbitrary messaging between functions, which may allow you to
> build what you have in mind.
> Take a look at Seth's an Igal's comments here [1], where there seems to be
> a
> similar incremental graph-processing use case for sessionization.
>
> Cheers,
> Gordon
>
> [1]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Upgrading Flink

2020-04-14 Thread Sivaprasanna
Ideally if the underlying cluster where the job is being deployed changes
(1.8.x to 1.10.x ), it is better to update your project dependencies to the
new version (1.10.x), and hence you need to recompile the jobs.

On Tue, Apr 14, 2020 at 3:29 PM Chesnay Schepler  wrote:

> @Robert Why would he have to recompile the jobs? Shouldn't he be fine soo
> long as he isn't using any API for which we broke binary-compatibility?
>
> On 09/04/2020 09:55, Robert Metzger wrote:
>
> Hey Stephen,
>
> 1. You should be able to migrate from 1.8 to 1.10:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html#compatibility-table
>
> 2. Yes, you need to recompile (but ideally you don't need to change
> anything).
>
>
>
> On Mon, Apr 6, 2020 at 10:19 AM Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Quick questions on upgrading Flink.
>>
>> All our jobs are compiled against Flink 1.8.x
>>
>> We are planning to upgrade to 1.10.x
>>
>> 1. Is the recommended path to upgrade one minor at a time, i.e. 1.8.x ->
>> 1.9.x and then 1.9.x -> 1.10.x as a second step or is the big jump
>> supported, i.e. 1.8.x -> 1.10.x in one change
>>
>> 2. Do we need to recompile the jobs against the newer Flink version
>> before upgrading? Coordinating multiple teams can be tricky, so - short of
>> spinning up a second flink cluster - our continuous deployment
>> infrastructure will try to deploy the topologies compiled against 1.8.x for
>> an hour or two after we have upgraded the cluster
>>
>
>


Flink sql Session window

2020-04-14 Thread snack white
Hi,
In flink sql session window,  is there a way to finish a session window except 
of session gap ?  ex. Session  window size reach a limit.
Thanks,
white

Re: flink sql ddl 不支持primary key

2020-04-14 Thread Jingsong Li
Hi,

是的,现在是不支持,老的sink没有使用这个primary key来做upsert,但是在1.11里新的sink接口会打通DDL的primary
key的。[1]

[1]https://issues.apache.org/jira/browse/FLINK-17030

Best,
Jingsong Lee

On Tue, Apr 14, 2020 at 5:38 PM 叶贤勋  wrote:

> Hi all:
> 我看源码在将sqlNode转换CreateTableOperator[1]时,还是不支持primary key配置,但是sql
> parser是已经能够解析,请问下为何不放开这个限制。
>
>
>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java#L178
>
>
>
>
> | |
> 叶贤勋
> |
> |
> yxx_c...@163.com
> |
> 签名由网易邮箱大师定制
>
>

-- 
Best, Jingsong Lee


Objects with fields that are not serializable

2020-04-14 Thread Dominik Wosiński
Hey,
I have a question about using classes with fields that are not serializable
in DataStream. Basically, I would like to use the  Java's Optional in
DataStream. So Say I have a class *Data *that has several optional fields
and I would like to have *DataStream*. I don't think this should
cause any issues, but I thought it can be good to ask whether this can
cause any issues with Flink Jobs.

Thanks,
Best,
Dom.


Re: Registering UDAF in blink batch app

2020-04-14 Thread Zhenghua Gao
`StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl`
object,
which has several `registerFunction` interface for
ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.

`TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which
is a unify entry point for Table/SQL programs.
And it only has a deprecated `registerFunction` interface for
ScalarFunction.  You should use `createTemporarySystemFunction` instead.

A workaround for batch mode of blink planner is: You can use the public
constructor of `StreamTableEnvironmentImpl` to create
the TableEnvironment and use `registerFunction`s. Pls make sure you pass in
the correct `isStreamingMode = false`

*Best Regards,*
*Zhenghua Gao*


On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan 
wrote:

> Hi All,
>
>
>
> Could you please tell how to register custom Aggregation function in blink
> batch app?
>
> In case of streaming mode:
>
> We create
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.*newInstance*().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.*create*(env, 
> bsSettings);
>
>
>
> Which has:
>
>  void registerFunction(String name, AggregateFunction
> aggregateFunction);
>
>
>
> But in case of batchMode, we need to create TableEnvironment:
>
>
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.*newInstance*().useBlinkPlanner().inBatchMode().build();
> tEnv = TableEnvironment.*create*(bsSettings);
>
>
>
> Which does not have this function to register AggregationFunction, only
> Scalar one.
>
>
>
> Details: Flink 1.10, Java API
>
>
>
>
>


Re: Upgrading Flink

2020-04-14 Thread Chesnay Schepler
@Robert Why would he have to recompile the jobs? Shouldn't he be fine 
soo long as he isn't using any API for which we broke binary-compatibility?


On 09/04/2020 09:55, Robert Metzger wrote:

Hey Stephen,

1. You should be able to migrate from 1.8 to 1.10: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html#compatibility-table


2. Yes, you need to recompile (but ideally you don't need to change 
anything).




On Mon, Apr 6, 2020 at 10:19 AM Stephen Connolly 
> wrote:


Quick questions on upgrading Flink.

All our jobs are compiled against Flink 1.8.x

We are planning to upgrade to 1.10.x

1. Is the recommended path to upgrade one minor at a time, i.e.
1.8.x -> 1.9.x and then 1.9.x -> 1.10.x as a second step or is the
big jump supported, i.e. 1.8.x -> 1.10.x in one change

2. Do we need to recompile the jobs against the newer Flink
version before upgrading? Coordinating multiple teams can be
tricky, so - short of spinning up a second flink cluster - our
continuous deployment infrastructure will try to deploy the
topologies compiled against 1.8.x for an hour or two after we have
upgraded the cluster





Registering UDAF in blink batch app

2020-04-14 Thread Dmytro Dragan
Hi All,

Could you please tell how to register custom Aggregation function in blink 
batch app?
In case of streaming mode:
We create

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
bsSettings);

Which has:
 void registerFunction(String name, AggregateFunction 
aggregateFunction);

But in case of batchMode, we need to create TableEnvironment:


EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
tEnv = TableEnvironment.create(bsSettings);

Which does not have this function to register AggregationFunction, only Scalar 
one.

Details: Flink 1.10, Java API




Re: Possible memory leak in JobManager (Flink 1.10.0)?

2020-04-14 Thread Marc LEGER
Hello,

Actually, I agree I do not need to have such an aggressive checkpoint
period for my jobs, so I increased the checkpoint period from 1 to 10s and
JobManager memory consumption is now quite stable for 3 days in my Flink
1.10.0 cluster.

Thanks a lot for your help.

Best regards,
Marc

Le ven. 10 avr. 2020 à 11:54, Till Rohrmann  a écrit :

> What you could also try out is whether the same problem occurs with Flink
> 1.7.3. We did the executor change in this bug fix release. This could help
> us validating my suspicion.
>
> Cheers,
> Till
>
> On Thu, Apr 9, 2020 at 4:24 PM Till Rohrmann  wrote:
>
>> For further reference, I've created this issue [1] to track the problem.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17073
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 9, 2020 at 1:20 PM Yun Tang  wrote:
>>
>>> Hi Marc
>>>
>>> The left 'chk-X' folders, which should be discarded when removing
>>> checkpoint at the final stage, could also prove that those not discarded
>>> completed checkpoint meta occupied the memory.
>>>
>>> If we treat your average checkpoint meta size as 30KB, 2
>>> not-discarded complete checkpoints would occupy about 585MB memory, which
>>> is close to your observed scenario.
>>>
>>> From my point of view, the checkpoint interval of one second is really
>>> too often and would not make much sense in production environment.
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Till Rohrmann 
>>> *Sent:* Thursday, April 9, 2020 17:41
>>> *To:* Marc LEGER 
>>> *Cc:* Yun Tang ; user@flink.apache.org <
>>> user@flink.apache.org>
>>> *Subject:* Re: Possible memory leak in JobManager (Flink 1.10.0)?
>>>
>>> Thanks for reporting this issue Marc. From what you've reported, I think
>>> Yun is right and that the large memory footprint is caused by
>>> CompletedCheckpoints which cannot be removed fast enough. One way to verify
>>> this is to enable TRACE logging because then Flink will log for every
>>> CompletedCheckpoint when it gets discarded. The line should look like this
>>> "Executing discard procedure for Checkpoint". The high number of chk-X
>>> folders on S3 could be the result of the slow discard operations.
>>>
>>> If you want then we can also take a look at the logs and ideally also
>>> the heap dump if you can share them with us.
>>>
>>> I think one difference between Flink 1.10.0 and 1.7.2 is that we are
>>> using a fixed thread pool for running the io operations in 1.10.0. The
>>> number of threads equals the number of cores. In contrast, in Flink 1.7.2
>>> we used a fork join pool with a max parallelism of 64. This difference
>>> could explain the lower throughput of discard operations because fewer can
>>> happen in parallel.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER  wrote:
>>>
>>> Hello Yun,
>>>
>>> Thank you for your feedback, please find below my answers to your
>>> questions:
>>>
>>> 1. I am using incremental state checkpointing with RocksDB backend and
>>> AWS S3 as a distributed file system, everything is configured in
>>> flink-conf.yaml as follows:
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>> # placeholders are replaced at deploy time
>>> state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
>>> state.backend.rocksdb.localdir: /home/data/flink/rocksdb
>>>
>>> Size of _metdata file in a checkpoint folder for the 3 running jobs:
>>> - job1: 64KB
>>> - job2: 1K
>>> - job3: 10K
>>>
>>> By the way, I have between 1 and 2 "chk-X" folders per job in S3.
>>>
>>> 2. Checkpointing is configured to be triggered every second for all the
>>> jobs. Only the interval is set, otherwise everything is kept as default:
>>>
>>> executionEnvironment.enableCheckpointing(1000);
>>>
>>> Best Regards,
>>> Marc
>>>
>>> Le mer. 8 avr. 2020 à 20:48, Yun Tang  a écrit :
>>>
>>> Hi Marc
>>>
>>> I think the occupied memory is due to the to-remove complete checkpoints
>>> which are stored in the workQueue of io-executor [1] in
>>> ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that
>>> Executors#newFixedThreadPool would create a ThreadPoolExecutor with a
>>> LinkedBlockingQueue to store runnables.
>>>
>>> To figure out the root cause, would you please check the information
>>> below:
>>>
>>>1. How large of your checkpoint meta, you could view
>>>{checkpoint-dir}/chk-X/_metadata to know the size, you could provide what
>>>state backend you use to help know this.
>>>2. What is the interval of your checkpoints, a smaller checkpoint
>>>interval might accumulate many completed checkpoints to subsume once a
>>>newer checkpoint completes.
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260
>>> [2]
>>> 

Re: [Stateful Functions] Using Flink CEP

2020-04-14 Thread Igal Shilman
Hi,

I'm not familiar with the other library that you have mentioned, and indeed
using Flink CEP from within a stateful function is not possible within a
single Flink job,  as Gordon mentioned.

I'm wondering what aspects of CEP are you interested in?
Because essentially a stateful function can be considered as a state
machine with auxiliary state.
You can take a look at the ride sharing example [1] for instance, where the
drivers, and the rides are cooperative state machines.

[1] -
https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing

Good luck!
Igal.


On Tue, Apr 14, 2020 at 5:07 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> It isn't possible to use Flink CEP within Stateful Functions.
>
> That could be an interesting primitive, to add CEP-based function
> constructs.
> Could your briefly describe what you are trying to achieve?
>
> On the other hand, there are plans to integrate Stateful Functions more
> closely with the Flink APIs.
> One direction we've been thinking about is to, for example, support Flink
> DataStreams as StateFun ingress / egresses. In this case, you'll be able to
> use Flink CEP to detect patterns, and use the results as an ingress which
> invokes functions within a StateFun app.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


flink sql ddl 不支持primary key

2020-04-14 Thread 叶贤勋
Hi all:
我看源码在将sqlNode转换CreateTableOperator[1]时,还是不支持primary key配置,但是sql 
parser是已经能够解析,请问下为何不放开这个限制。


https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java#L178




| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制



Re: Can I use Apache-Flink for Android API-Level < 26?

2020-04-14 Thread Chesnay Schepler
I agree with your conclusion that you cannot use Flink on an API Level 
below 26.


I do not know whether it will work even with Level 26 though, as I'm not 
aware of anyone having tried it.


On 14/04/2020 11:03, Alexander Borgschulze wrote:
I am trying to use Apache-Flink in my Android-Project with 
"minSdkVersion 24".

Unfortunately, the following code causes an error:
    val env: StreamExecutionEnvironment = 
LocalStreamEnvironment.getExecutionEnvironment()

    env.streamTimeCharacteristic = TimeCharacteristic.ProcessingTime
    val integerStream: DataStream = 
env.fromCollection(mutableListOf(1,2,3,4,5))


    integerStream.print()
    env.execute()

java.lang.NoClassDefFoundError: Failed resolution of: Ljava/time/Instant;
        at 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.(BasicTypeInfo.java:89)
        at 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.getInfoFor(BasicTypeInfo.java:214)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1705)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1643)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForObject(TypeExtractor.java:2038)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getForObject(TypeExtractor.java:1983)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:782)
The java.time API was introduced in API Level 26. So is it impossible 
to run Flink on older Android-Verions?





Re: [Stateful Functions] Using statefun for E2E testing

2020-04-14 Thread Igal Shilman
Hi,

I'm glad to hear that your PoC with StateFun functions has turned out to be
successful, even
if it is for verifying external systems are integrating with each other
correctly.

I hope that eventually StateFun would replace the 3 external systems :-)

Good luck,
Igal.

On Fri, Apr 10, 2020 at 3:45 AM Oytun Tez  wrote:

> Hi there,
>
> Today we were designing a test for a workflow that involved 3 different
> systems talking to each other async. My colleague came with the idea that
> we could use Flink for E2E, which we got excited about.
>
> We came with a quick implementation, within our existing Flink
> application, after some hours of debugging this and that, everything
> actually worked very nicely. We triggered the initial actions within
> Functions, other Functions kept state for CEP-like logics (can we use Flink
> CEP directly?), some events triggered validation assortments via async API
> calls and such...
>
> Has anyone used a similar approach? This is just a general question to see
> resources about integration testing via Flink.
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>


Re: Javadocs Broken?

2020-04-14 Thread Chesnay Schepler

I'm looking into it.

On 10/04/2020 11:27, tison wrote:

Hi guys,

Right now when I click "JavaDocs" in out docsite[1] it jumps to a 
page[2] I think is definitely not

out api documentation. Any thoughts?

Best,
tison.

[1] https://ci.apache.org/projects/flink/flink-docs-master/
[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/






Can I use Apache-Flink for Android API-Level < 26?

2020-04-14 Thread Alexander Borgschulze
I am trying to use Apache-Flink in my Android-Project with "minSdkVersion 24".

Unfortunately, the following code causes an error:

   

 

    val env: StreamExecutionEnvironment = LocalStreamEnvironment.getExecutionEnvironment()
    env.streamTimeCharacteristic = TimeCharacteristic.ProcessingTime
    val integerStream: DataStream = env.fromCollection(mutableListOf(1,2,3,4,5))
    
    integerStream.print()
    env.execute()


 

java.lang.NoClassDefFoundError: Failed resolution of: Ljava/time/Instant;
        at org.apache.flink.api.common.typeinfo.BasicTypeInfo.(BasicTypeInfo.java:89)
        at org.apache.flink.api.common.typeinfo.BasicTypeInfo.getInfoFor(BasicTypeInfo.java:214)
        at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1705)
        at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1643)
        at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForObject(TypeExtractor.java:2038)
        at org.apache.flink.api.java.typeutils.TypeExtractor.getForObject(TypeExtractor.java:1983)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:782)

 

 

 

The java.time API was introduced in API Level 26. So is it impossible to run Flink on older Android-Verions?


Re: 关于pyflink连接rabbitmq

2020-04-14 Thread Dian Fu
PyFlink目前只支持Python Table 
API,rabbitmq目前还没有提供Table/SQL的connector,如果想在PyFlink里使用rabbitmq,有以下几种解决方案:

1)实现Java的rabbitmq的TableSource/TableSink,可以参考Kafka等connector的实现,基本只需要在现有实现的基础上包装一下即可。
2)在PyFlink作业里使用rabbitmq的source/sink。目前在PyFlink里注册TableSource/TableSink有2种方式:
2.1)如果使用TableEnvironment.register_table_source/register_table_sink接口注册rabbitmq的connector,你需要做的事情是:写一个Python的rabbitmq的TableSource/TableSink的wrapper,可以参考一下CsvTableSource/CsvTableSource的实现方式[1][2]
2.2)  
如果使用TableEnvironment.connect接口,你需要做的事情是:写一个Java的rabbitmq的TableSourceSinkFactory,然后使用CustomConnectorDescriptor[3],CustomConnectorDescriptor的使用可以参考一下test
 case[4]: 

[1] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/table/sinks.py#L40
 

[2] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/table/sources.py#L35
 

[3] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/table/descriptors.py#L1199
 

[4] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/table/tests/test_descriptor.py#L347
 

> 在 2020年4月10日,下午5:09,Ella SUN  写道:
> 
> 您好~
> 
> 我是一个flink的初学者。目前的需求是从rabbitmq读入流处理后存进mysql。
> 因为公司大家的技术栈是python,最近看到了中国社区在推广pyflink(apache-flink)所以就试了一下。但是我发现连接rabbitmq的时候,java有很多包支持,比如
> org.apache.flink.streaming.connectors.rabbitmq.*
> ,想问一下pyflink有类似的连接支持么?
> 
> 或者有什么推荐的解决方案么比如用pika连接等
> 
> 多谢~
> -- 
> Ella Sun



Re: [1.10.0] flink-dist source jar is empty

2020-04-14 Thread Chesnay Schepler
That should not be a problem since the flink-dist module does not 
contain any java sources


On 14/04/2020 06:42, Steven Wu wrote:


We build and publish flink-dist locally. But the source jar turns out 
empty. Other source jars (like flink-core) are good. Anyone else 
experienced similar problem?


Thanks,
Steven





Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread godfrey he
Hi Jiahui,

I think this is the problem of multiple sinks optimization. If we optimize
each sink eager (that means we optimize the query when we call
`writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is
functionally equivalent to QueryConfig.  which require we need
call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or
`insertInto`.  While, If we use multiple sinks optimization, It's hard to
map the value of `TableConfig#setIdleStateRetentionTime` to each query. I
think it's a common issue for configuring for per query on multiple sinks
optimization.

but for `toRetractStream` method, we keep eager optimization strategy. So
you can call `TableConfig#setIdleStateRetentionTime` before
`toRetractStream`.

Best,
Godfrey

Jiahui Jiang  于2020年4月14日周二 下午12:15写道:

> Hey Godfrey, in some of the use cases our users have, they have a couple
> of complex join queries where the key domains key evolving - we definitely
> want some sort of state retention for those queries; but there are other
> where the key domain doesn't evolve overtime, but there isn't really a
> guarantee on what's the maximum gap between 2 records of the same key to
> appear in the stream, we don't want to accidentally invalidate the state
> for those keys in these streams.
>
> Because of queries with different requirements can both exist in the
> pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per
> operator.
>
> Just wondering, has similar requirement not come up much for SQL users
> before? (being able to set table / query configuration inside SQL queries)
>
> We are also a little bit concerned because right now since
> 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the
> fact that TableConfig is read during toDataStream feels like relying on an
> implementation details that just happens to work, and there is no guarantee
> that it will keep working in the future versions...
>
> Thanks!
> --
> *From:* godfrey he 
> *Sent:* Monday, April 13, 2020 9:51 PM
> *To:* Jiahui Jiang 
> *Cc:* Jark Wu ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> Query hint is a way for fine-grained configuration.
>  just out of curiosity, is it a strong requirement
>  that users need to config different IDLE_STATE_RETENTION_TIME for each
> operator?
>
> Best,
> Godfrey
>
> Jiahui Jiang  于2020年4月14日周二 上午2:07写道:
>
> Also for some more context, we are building a framework to help users
> build their Flink pipeline with SQL. Our framework handles all the setup
> and configuration, so that users only need to write the SQL queries without
> having to have any Flink knowledge.
>
> One issue we encountered was, for some of the streams, the key domain
> keeps evolving and we want to expire the states for older keys. But there
> is no easy ways to allow users configure their state timeout directly
> through SQL APIs.
> Currently we are asking users to configure idleStateRetentionTime in a
> custom SQL hint, then our framework will parse it and set it up during
> table registration time.
>
> An example query that users can be writing right now looks like,
>
> *CREATE TABLE *`/output` *AS*
>
> *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */
> * *
>
> *FROM * `/input1` a
>
> INNER JOIN `/input2` b
>
> ON *a.column_name *=* b.column_name*;
>
> Is this something Flink SQL may want to support out of the box? (Starting
> from Calcite 1.22.0
> , it started
> to provide first class hint parsing)
>
>
> --
> *From:* Jiahui Jiang 
> *Sent:* Sunday, April 12, 2020 4:30 PM
> *To:* Jark Wu 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hey Jark, thank you so much for confirming!
>
> Out of curiosity, even though I agree that having too many config classes
> are confusing, not knowing when the config values are used during pipeline
> setup is also pretty confusing. For example, the name of 'TableConfig'
> makes me feel it's global to the whole tableEnvironment (which is true) but is
> only read once at execution (which is not true). Can we try to surface or
> add some documentation on when are these configs are read? 
>
> Thank you so much!
> --
> *From:* Jark Wu 
> *Sent:* Saturday, April 11, 2020 8:45 AM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Yes, that's right. Set idleStateRetentionTime on TableConfig before
> translation should work.
>
> On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang 
> wrote:
>
> Thank you for answering! I 

Re: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-14 Thread wangweigu...@stevegame.cn

 昨天测试了下,除了需要添加 flink-connector-kafka_2.11-1.10.0.jar 
这个外,还需要flink-connector-kafka-base_2.11-1.10.0.jar,感觉Flink在添加依赖jar做的不是很好,添加也不够灵活!

发件人: zhisheng
发送时间: 2020-04-14 15:24
收件人: user-zh
主题: Re: Re: Flink 
1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
应该加了 flink-connector-kafka_2.11-1.10.0.jar 这个就行
 
wangweigu...@stevegame.cn  于2020年4月13日周一
下午3:09写道:
 
>
> 感谢flink道友解答,谢谢!
>
>
> 目前是通过maven来开发flink程序,只是编译打包到集群运行的时候缺少kafka依赖包,flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
> 这些添加到lib后,程序运行成功!
>
> 发件人: 刘宇宝
> 发送时间: 2020-04-13 14:59
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
> 用官方项目模板起步,
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html
>
> 不要往 flink 里头加 jar 包,在你项目的 pom.xml 里加:
>
>   
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
> From: "wangweigu...@stevegame.cn" 
> Reply-To: "user-zh@flink.apache.org" 
> Date: Monday, April 13, 2020 at 2:32 PM
> To: user-zh 
> Subject: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
>
>
> 你好:
>
> 我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat
> jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
> 我在Flink 1.10集群的每个节点下的
> /lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
> 我启动的命令:
> 我先启动了一个Yarn session:
> yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
> 然后在session提交任务测试
> flink run -d -p 2 -m yarn-cluster -c
> com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid
> application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
> 启动就报如下错误:
>[cid:_Foxmail.1@bf61ef0c-2f52-034d-bba5-a41cbf6b4faf]
>
> /lib下的依赖包:
> [cid:_Foxmail.1@0be9c7f1-1b24-8e3e-ea4f-d47b95d9ffaf]
>
> 代码片段:
> [cid:_Foxmail.1@76174c8c-512d-b948-71c9-359c474bf11e]
>
> 就是简单的读取数据,输出测试!
>
> 
> [
> https://exmail.qq.com/cgi-bin/viewfile?type=signature=ZX1328-4PdHqpEhbWjLSGE47md0b7k=688208663
> ]
>
>
>
>
>
>
> 史蒂夫软件(深圳)有限公司
> 技术部   王卫光
> wangweigu...@stevegame.cn
> 地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
> 手机/Mob:13128970998
> http://www.stevengame.com/
>


Re: flink array 查询解析问题

2020-04-14 Thread Benchao Li
Hi,

你的UDF应该要显示指定一下参数的类型,覆盖ScalarFunction的getParameterTypes方法。
因为UDF对于复杂类型的推导能力有限,这种复杂类型可以显示指定参数类型。

出发 <573693...@qq.com> 于2020年4月14日周二 下午3:37写道:

> 1.定义ddl解析array字段时候,假如select
> 那个字段可以解析出。2.当我去定义自己函数时候,会出现null,flink直接跳过解析array那个函数了吗?
> CREATE TABLE sourceTable (
> event_time_line array `rule_name` VARCHAR,
> `count` VARCHAR
> )
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.startup-mode' = 'earliest-offset',
> 'connector.topic' = 'topic_test_1',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
> );
> --可以查出数据
> select event_time_line from sourceTable ;
> --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
> select type_change(event_time_line) from sourceTable ;
>
>
> public class TypeChange extends ScalarFunction {
> /**
>  * 为null,但是数组有长度
>  * @param rows
>  * @return
>  */
> public String eval(Row [] rows){
> return JSONObject.toJSONString(rows);
> }
>
> }



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


关于pyflink连接rabbitmq

2020-04-14 Thread Ella SUN
您好~

我是一个flink的初学者。目前的需求是从rabbitmq读入流处理后存进mysql。
因为公司大家的技术栈是python,最近看到了中国社区在推广pyflink(apache-flink)所以就试了一下。但是我发现连接rabbitmq的时候,java有很多包支持,比如
org.apache.flink.streaming.connectors.rabbitmq.*
,想问一下pyflink有类似的连接支持么?

或者有什么推荐的解决方案么比如用pika连接等

多谢~
-- 
Ella Sun


flink array ????????????

2020-04-14 Thread ????
1.ddlarray??select 
2.??null??flinkarray??
CREATE TABLE sourceTable (
event_time_line array

回复: 关于kafka connector通过python链接

2020-04-14 Thread 秦寒
这个问题定义了 

我用了两个kafka包,其中红色的包是不需要的,非常感谢你们的帮助

flink-connector-kafka_2.11-1.10.0.jar

flink-sql-connector-kafka_2.11-1.10.0.jar

 

发件人: 秦寒  
发送时间: 2020年4月10日 10:15
收件人: 'Hequn Cheng' ; 'user-zh' 
主题: 回复: 关于kafka connector通过python链接

 

这个搞定了,pip3重装了一下apache flink,引入了jar包搞定

 

发件人: 秦寒 <  han...@chinaums.com> 
发送时间: 2020年4月9日 16:41
收件人: 'Hequn Cheng' <  he...@apache.org>; 'user-zh' < 
 user-zh@flink.apache.org>
主题: 回复: 关于kafka connector通过python链接

 

您好

   根据你们的说明我做了如下配置,我用的是flink 1.10版本

1在pyflink/lib下面添加了kafka-clients-2.2.0.jar



 

2 在build-target/lib下面添加了flink-sql-connector-kafka_2.11-1.10.0.jar  
flink-connector-kafka_2.11-1.10.0.jar 以及 flink-json-1.10.0-sql-jar.jar



 

3 构建PyFlink发布包并安装

cd flink-python; python setup.py sdist 
pip install dist/*.tar.gz

 

 

4 执行测试程序tumble_window.py报错如下,不知道你们有没有遇见过这个错误,望能解答

[yy1s@rbtnode1 project]$ python3 tumble_window.py





 

 

 

发件人: Hequn Cheng <  he...@apache.org> 
发送时间: 2020年4月9日 10:08
收件人: user-zh <  user-zh@flink.apache.org>
抄送:   han...@chinaums.com
主题: Re: 关于kafka connector通过python链接

 

Hi 秦寒,

 

Dian 说得很完善了。除此之外,金竹的博客[1]有介绍“Python API 中如何使用 Kafka”,可能对你有帮助,可以看下。

 

Best, Hequn

 

[1] 
https://enjoyment.cool/2019/08/28/Apache%20Flink%20%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%20Python%20API%20%E4%B8%AD%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%20Kafka/

 

On Thu, Apr 9, 2020 at 9:34 AM Dian Fu mailto:dian0511...@gmail.com> > wrote:

你指的是Python Table API中如何使用kafka connector的例子吗?这个是有例子的[1]。

关于如何把kafka client的jar包配置到Python环境,分两种情况,当前有对应的两种解决方案:
1)如果是local运行,需要把kafka client的jar拷贝到python环境中pyflink的lib目录下
2)如果是remote运行,可以通过CLI的-j选项添加。

这两种方式对于Python用户来说可能都不太便捷,所以已有一个JIRA[3]在考虑添加另外一种对Python用户来说更友好的方式,欢迎到JIRA里参与讨论。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html 

[3] https://issues.apache.org/jira/browse/FLINK-16943 

> 在 2020年4月9日,上午8:45,zhisheng   > 写道:
> 
> hi, 秦寒
> 
> 暂时还没有 Python 这块的 API,可以去社区 JIRA 提建议
> 
> Best
> 
> zhisheng
> 
> 秦寒 mailto:han...@chinaums.com> > 于2020年4月8日周三 下午4:10写道:
> 
>> 您好
>> 
>>   Flink的 kafka connector 文档中只有java 和scala的列子,能否添加python
>> 调用kafka的列子,包括如何添加kafka connector,kafka client的jar包配置到pyhon
>> 环境等,谢谢。
>> 
>> 
>> 
>> 



回复: 关于kafka connector通过python链接

2020-04-14 Thread 秦寒
这个搞定了,pip3重装了一下apache flink,引入了jar包搞定

 

发件人: 秦寒  
发送时间: 2020年4月9日 16:41
收件人: 'Hequn Cheng' ; 'user-zh' 
主题: 回复: 关于kafka connector通过python链接

 

您好

   根据你们的说明我做了如下配置,我用的是flink 1.10版本

1在pyflink/lib下面添加了kafka-clients-2.2.0.jar



 

2 在build-target/lib下面添加了flink-sql-connector-kafka_2.11-1.10.0.jar  
flink-connector-kafka_2.11-1.10.0.jar 以及 flink-json-1.10.0-sql-jar.jar



 

3 构建PyFlink发布包并安装

cd flink-python; python setup.py sdist 
pip install dist/*.tar.gz

 

 

4 执行测试程序tumble_window.py报错如下,不知道你们有没有遇见过这个错误,望能解答

[yy1s@rbtnode1 project]$ python3 tumble_window.py





 

 

 

发件人: Hequn Cheng mailto:he...@apache.org> > 
发送时间: 2020年4月9日 10:08
收件人: user-zh mailto:user-zh@flink.apache.org> >
抄送: han...@chinaums.com  
主题: Re: 关于kafka connector通过python链接

 

Hi 秦寒,

 

Dian 说得很完善了。除此之外,金竹的博客[1]有介绍“Python API 中如何使用 Kafka”,可能对你有帮助,可以看下。

 

Best, Hequn

 

[1] 
https://enjoyment.cool/2019/08/28/Apache%20Flink%20%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%20Python%20API%20%E4%B8%AD%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%20Kafka/

 

On Thu, Apr 9, 2020 at 9:34 AM Dian Fu mailto:dian0511...@gmail.com> > wrote:

你指的是Python Table API中如何使用kafka connector的例子吗?这个是有例子的[1]。

关于如何把kafka client的jar包配置到Python环境,分两种情况,当前有对应的两种解决方案:
1)如果是local运行,需要把kafka client的jar拷贝到python环境中pyflink的lib目录下
2)如果是remote运行,可以通过CLI的-j选项添加。

这两种方式对于Python用户来说可能都不太便捷,所以已有一个JIRA[3]在考虑添加另外一种对Python用户来说更友好的方式,欢迎到JIRA里参与讨论。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html 

[3] https://issues.apache.org/jira/browse/FLINK-16943 

> 在 2020年4月9日,上午8:45,zhisheng   > 写道:
> 
> hi, 秦寒
> 
> 暂时还没有 Python 这块的 API,可以去社区 JIRA 提建议
> 
> Best
> 
> zhisheng
> 
> 秦寒 mailto:han...@chinaums.com> > 于2020年4月8日周三 下午4:10写道:
> 
>> 您好
>> 
>>   Flink的 kafka connector 文档中只有java 和scala的列子,能否添加python
>> 调用kafka的列子,包括如何添加kafka connector,kafka client的jar包配置到pyhon
>> 环境等,谢谢。
>> 
>> 
>> 
>> 



回复:ddl array 创建问题求助

2020-04-14 Thread 出发
谢谢





-- 原始邮件 --
发件人: Benchao Li 

关于flink run -m yarn提交失败。flink1.9

2020-04-14 Thread guanyq
提交失败,yarn资源也还有很多,为什么会提交失败呢?

提交脚本
./bin/flink run -m yarn-cluster \
-ynm TestDataProcess \
-yd \
-yn 2 \
-ytm 1024 \
-yjm 1024 \
-c com.data.processing.unconditionalacceptance.TestDataProcess \
./tasks/UnconditionalAcceptanceDataProcess.jar \


yarn资源
Apps Submitted  Apps PendingApps RunningApps Completed  Containers 
Running  Memory Used Memory TotalMemory Reserved VCores Used 
VCores TotalVCores Reserved Active NodesDecommissioned NodesLost 
Nodes  Unhealthy Nodes Rebooted Nodes
239 0   12  227 173 334 GB  1.42 TB 0 B 173 288 
0   9   0   0   0   0



2020-04-14 15:14:19,002 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:19,253 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:19,504 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:19,755 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:20,006 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:20,257 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:20,508 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:20,759 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:21,011 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:21,262 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:21,513 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:21,764 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:22,015 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:22,265 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:22,517 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:22,768 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:23,019 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:23,270 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:23,521 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:23,772 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested resources are available in 
the YARN cluster
2020-04-14 15:14:24,025 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment took 
more than 60 seconds. Please check if the requested 

Re: Re: 关于flink 提交job参数不生效的问题

2020-04-14 Thread zhisheng
可以试试设置 -ytm 2048m  看看是不是还这样

wangweigu...@stevegame.cn  于2020年4月14日周二
下午2:16写道:

>
>   应该是你设置的 -ytm 和 -yjm内存大小比yarn container最小容器内存都小吧!
>  yarn最小容器内存的参数: yarn.scheduler.minimum-allocation-mb
> 容器内存增量: yarn.scheduler.increment-allocation-mb
>
> 发件人: guanyq
> 发送时间: 2020-04-14 14:05
> 收件人: user-zh
> 主题: Re:Re: 关于flink 提交job参数不生效的问题
> ./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
> \-yjm 666 \-c
> com.data.processing.unconditionalacceptance.TestDataProcess
> \./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id
> Test001 \--checkpoint.interval 5000
> 在 2020-04-14 14:00:59,"Xintong Song"  写道:
> >你邮件里的图片没有显示出来。
> >建议把完整的启动命令贴一下。
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:
> >
> >> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
> >>
> >>
> >>
> >>
>


Re: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-14 Thread zhisheng
应该加了 flink-connector-kafka_2.11-1.10.0.jar 这个就行

wangweigu...@stevegame.cn  于2020年4月13日周一
下午3:09写道:

>
> 感谢flink道友解答,谢谢!
>
>
> 目前是通过maven来开发flink程序,只是编译打包到集群运行的时候缺少kafka依赖包,flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
> 这些添加到lib后,程序运行成功!
>
> 发件人: 刘宇宝
> 发送时间: 2020-04-13 14:59
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
> 用官方项目模板起步,
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html
>
> 不要往 flink 里头加 jar 包,在你项目的 pom.xml 里加:
>
>   
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
> From: "wangweigu...@stevegame.cn" 
> Reply-To: "user-zh@flink.apache.org" 
> Date: Monday, April 13, 2020 at 2:32 PM
> To: user-zh 
> Subject: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
>
>
> 你好:
>
> 我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat
> jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
> 我在Flink 1.10集群的每个节点下的
> /lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
> 我启动的命令:
> 我先启动了一个Yarn session:
> yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
> 然后在session提交任务测试
> flink run -d -p 2 -m yarn-cluster -c
> com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid
> application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
> 启动就报如下错误:
>[cid:_Foxmail.1@bf61ef0c-2f52-034d-bba5-a41cbf6b4faf]
>
> /lib下的依赖包:
> [cid:_Foxmail.1@0be9c7f1-1b24-8e3e-ea4f-d47b95d9ffaf]
>
> 代码片段:
> [cid:_Foxmail.1@76174c8c-512d-b948-71c9-359c474bf11e]
>
> 就是简单的读取数据,输出测试!
>
> 
> [
> https://exmail.qq.com/cgi-bin/viewfile?type=signature=ZX1328-4PdHqpEhbWjLSGE47md0b7k=688208663
> ]
>
>
>
>
>
>
> 史蒂夫软件(深圳)有限公司
> 技术部   王卫光
> wangweigu...@stevegame.cn
> 地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
> 手机/Mob:13128970998
> http://www.stevengame.com/
>


????checkpoint??hdfs??kerberos????????????

2020-04-14 Thread ??????
??flink1.9.1hdfs??keberos
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/abc.keytab
security.kerberos.login.principal: abc/ad...@test.com



2020-04-14 11:14:20,650 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 288 @ 1586834050629 for job 45e07f380ba6607cb93b01bbbcd45729. 
2020-04-14 11:14:40,936 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 288 for job 45e07f380ba6607cb93b01bbbcd45729 (3705 bytes in 30224 
ms). 2020-04-14 11:19:25,501 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 289 @ 1586834350629 for job 45e07f380ba6607cb93b01bbbcd45729. 
2020-04-14 11:19:35,579 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
checkpoint 289 by task 7efc00b038a3f6a89892f83851ec2fde of job 
45e07f380ba6607cb93b01bbbcd45729 at container_1585822587175_2427_01_04 @ 
host68 (dataPort=11329). 2020-04-14 11:19:35,580 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 289 of job 45e07f380ba6607cb93b01bbbcd45729. java.lang.Exception: 
Could not materialize checkpoint 289 for operator Source: canaltest_sourceKafka 
- SourceConversion(table=[Unregistered_DataStream_1], fields=[_TOPIC, 
_MESSAGEKEY, _MESSAGE, _PARTITION, _OFFSET]) - SinkConversionToTuple2 - 
Map - SourceConversion(table=[Unregistered_DataStream_4], fields=[_TOPIC, 
_MESSAGEKEY, _MESSAGE, _PARTITION, _OFFSET, PROCTIME]) - 
Correlate(invocation=[PARSE_CANAL($cor0._MESSAGE)], 
correlate=[table(PARSE_CANAL($cor0._MESSAGE))], 
select=[_TOPIC,_MESSAGEKEY,_MESSAGE,_PARTITION,_OFFSET,PROCTIME,table_name,op_type,op_ts,current_ts,pos,primary_keys,data,before_data],
 rowType=[RecordType(VARCHAR(2147483647) _TOPIC, VARCHAR(2147483647) 
_MESSAGEKEY, VARCHAR(2147483647) _MESSAGE, INTEGER _PARTITION, BIGINT _OFFSET, 
TIME ATTRIBUTE(PROCTIME) PROCTIME, VARCHAR(2147483647) table_name, 
VARCHAR(2147483647) op_type, VARCHAR(2147483647) op_ts, VARCHAR(2147483647) 
current_ts, VARCHAR(2147483647) pos, VARCHAR(2147483647) primary_keys, 
VARCHAR(2147483647) data, VARCHAR(2147483647) before_data)], joinType=[INNER]) 
- Calc(select=[op_type, primary_keys, data, before_data]) - 
SinkConversionToTuple2 - Sink: HBase (1/3). at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
 at java.lang.Thread.run(Thread.java:748) Caused by: 
java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
and close the file system output stream to null in order to obtain the stream 
state handleat java.util.concurrent.FutureTask.report(FutureTask.java:122)  
at java.util.concurrent.FutureTask.get(FutureTask.java:192) at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

Re:Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-14 Thread chenxyz
Hi,Congxian:
不好意思,邮件消失在了邮件海中...
我是这么复现的,直接重启运行这个任务的TM。然后就会出现KeyedProcessFunction恢复失败。只有RocksDB 
StateBackend会出现这种错误,使用HDFS作为FsBackend可以正常恢复任务。一开始我以为是KeyedProcessFunction里面的自定义State恢复失败,最后写了一个空的KeyedProcessFunction也不能成功恢复任务。下面附上一个简单的Demo。
public class App {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(2 * 60 * 1000);

DataStreamSource source = env.addSource(new SourceFunction() {

private volatile boolean running = true;

@Override
public void run(SourceContext ctx) throws Exception {
Random rand = new Random();
for (int i = 0; i < 100; i++) {
int id = rand.nextInt();
ctx.collect(new Student(id, "Tom" + id));
}
synchronized (this) {
while (running) {
this.wait();
}
}
}

@Override
public void cancel() {
synchronized (this) {
running = false;
this.notifyAll();
}
}
});

source.keyBy("id").process(new KeyedProcessFunction() {
@Override
public void processElement(Student value, Context ctx, Collector out) 
throws Exception {
out.collect(value);
}
}).addSink(new SinkFunction() {
@Override
public void invoke(Student value, Context context) throws Exception {
System.out.println(value);
}
});


env.execute("test keyed process operator state restore");
}

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public static class Student implements Serializable {
private static final long serialVersionUID = 3909702675393996601L;
private Integer id;
private String name;
}
}

下面附上开启了DEBUG的log:

2020-04-14 11:42:44,679 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Establish 
JobManager connection for job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,684 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Offer reserved 
slots to the leader of job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,727 DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - 
Registered new allocation id ed04b5323aa885406201e85c9f8b7c78 for local state 
stores for job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,729 DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - 
Registered new local state store with configuration 
LocalRecoveryConfig{localRecoveryMode=false, 
localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78],
 jobID=6fd13de6e9c84a51425f7cc34ce94940, 
jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for 
6fd13de6e9c84a51425f7cc34ce94940 - bc764cd8ddf7a0cff126f51c16239658 - 0 under 
allocation id ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,742 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory  - Source: 
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Initialized 
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory@41801faf

2020-04-14 11:42:44,747 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received task 
Source: Custom Source (1/1).

2020-04-14 11:42:44,748 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) 
switched from CREATED to DEPLOYING.

2020-04-14 11:42:44,748 INFO  org.apache.flink.runtime.taskmanager.Task 
- Creating FileSystem stream leak safety net for task Source: 
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING]

2020-04-14 11:42:44,751 INFO  org.apache.flink.runtime.taskmanager.Task 
- Loading JAR files for task Source: Custom Source (1/1) 
(ee17273414060c57d2d331a83d1a84fc) [DEPLOYING].

2020-04-14 11:42:44,752 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Activate slot 
ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,772 DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - 
Registered new local state store with configuration 
LocalRecoveryConfig{localRecoveryMode=false, 
localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78],
 jobID=6fd13de6e9c84a51425f7cc34ce94940, 
jobVertexID=20ba6b65f97481d5570070de90e4e791, subtaskIndex=0}} for 
6fd13de6e9c84a51425f7cc34ce94940 - 20ba6b65f97481d5570070de90e4e791 - 0 under 
allocation id ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,786 DEBUG 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory  
- KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d): 
Created 1 input channels (local: 1, remote: 0, unknown: 0).

2020-04-14 11:42:44,788 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received task 
KeyedProcess -> Sink: 

Re: Re: 关于flink 提交job参数不生效的问题

2020-04-14 Thread wangweigu...@stevegame.cn

  应该是你设置的 -ytm 和 -yjm内存大小比yarn container最小容器内存都小吧!
 yarn最小容器内存的参数: yarn.scheduler.minimum-allocation-mb
容器内存增量: yarn.scheduler.increment-allocation-mb

发件人: guanyq
发送时间: 2020-04-14 14:05
收件人: user-zh
主题: Re:Re: 关于flink 提交job参数不生效的问题
./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666 
\-yjm 666 \-c 
com.data.processing.unconditionalacceptance.TestDataProcess 
\./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id Test001 
\--checkpoint.interval 5000
在 2020-04-14 14:00:59,"Xintong Song"  写道:
>你邮件里的图片没有显示出来。
>建议把完整的启动命令贴一下。
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:
>
>> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
>>
>>
>>
>>


Re:Re: Re: 关于flink 提交job参数不生效的问题

2020-04-14 Thread guanyq
脚本设置-ytm 666但是flink ui页面的,job manager--taskmanager.heap.size为1024
在 2020-04-14 14:10:31,"Xintong Song"  写道:
>启动命令看起来是对的。
>你说的不起作用,具体是什么现象呢?
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Apr 14, 2020 at 2:05 PM guanyq  wrote:
>
>> ./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
>> \-yjm 666 \-c
>> com.data.processing.unconditionalacceptance.TestDataProcess
>> \./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id
>> Test001 \--checkpoint.interval 5000
>> 在 2020-04-14 14:00:59,"Xintong Song"  写道:
>> >你邮件里的图片没有显示出来。
>> >建议把完整的启动命令贴一下。
>> >
>> >Thank you~
>> >
>> >Xintong Song
>> >
>> >
>> >
>> >On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:
>> >
>> >> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
>> >>
>> >>
>> >>
>> >>
>>


Re: Re: 关于flink 提交job参数不生效的问题

2020-04-14 Thread Xintong Song
启动命令看起来是对的。
你说的不起作用,具体是什么现象呢?

Thank you~

Xintong Song



On Tue, Apr 14, 2020 at 2:05 PM guanyq  wrote:

> ./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
> \-yjm 666 \-c
> com.data.processing.unconditionalacceptance.TestDataProcess
> \./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id
> Test001 \--checkpoint.interval 5000
> 在 2020-04-14 14:00:59,"Xintong Song"  写道:
> >你邮件里的图片没有显示出来。
> >建议把完整的启动命令贴一下。
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:
> >
> >> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
> >>
> >>
> >>
> >>
>


Re:Re: 关于flink 提交job参数不生效的问题

2020-04-14 Thread guanyq
./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666 
\-yjm 666 \-c 
com.data.processing.unconditionalacceptance.TestDataProcess 
\./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id Test001 
\--checkpoint.interval 5000
在 2020-04-14 14:00:59,"Xintong Song"  写道:
>你邮件里的图片没有显示出来。
>建议把完整的启动命令贴一下。
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:
>
>> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
>>
>>
>>
>>


Re: 关于flink 提交job参数不生效的问题

2020-04-14 Thread Xintong Song
你邮件里的图片没有显示出来。
建议把完整的启动命令贴一下。

Thank you~

Xintong Song



On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:

> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
>
>
>
>