Re: Flink SQL Count Distinct performance optimization

2020-01-07 Thread Kurt Young
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin  于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>> SELECT
>>
>>  aggId,
>>
>>  pageId,
>>
>>  statkey,
>>
>>  COUNT(DISTINCT deviceId) as cnt
>>
>>  FROM
>>
>>  (
>>
>>  SELECT
>>
>>  'ZL_005' as aggId,
>>
>>  'ZL_UV_PER_MINUTE' as pageId,
>>
>>  deviceId,
>>
>>  ts2Date(recvTime) as statkey
>>
>>  from
>>
>>  kafka_zl_etrack_event_stream
>>
>>  )
>>
>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>


Re: Flink SQL Count Distinct performance optimization

2020-01-07 Thread Kurt Young
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin  于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>> SELECT
>>
>>  aggId,
>>
>>  pageId,
>>
>>  statkey,
>>
>>  COUNT(DISTINCT deviceId) as cnt
>>
>>  FROM
>>
>>  (
>>
>>  SELECT
>>
>>  'ZL_005' as aggId,
>>
>>  'ZL_UV_PER_MINUTE' as pageId,
>>
>>  deviceId,
>>
>>  ts2Date(recvTime) as statkey
>>
>>  from
>>
>>  kafka_zl_etrack_event_stream
>>
>>  )
>>
>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>


Re: Flink SQL Count Distinct performance optimization

2020-01-07 Thread 贺小令
hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin  于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
> SELECT
>
>  aggId,
>
>  pageId,
>
>  statkey,
>
>  COUNT(DISTINCT deviceId) as cnt
>
>  FROM
>
>  (
>
>  SELECT
>
>  'ZL_005' as aggId,
>
>  'ZL_UV_PER_MINUTE' as pageId,
>
>  deviceId,
>
>  ts2Date(recvTime) as statkey
>
>  from
>
>  kafka_zl_etrack_event_stream
>
>  )
>
>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best


Re: Flink SQL Count Distinct performance optimization

2020-01-07 Thread 贺小令
hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin  于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
> SELECT
>
>  aggId,
>
>  pageId,
>
>  statkey,
>
>  COUNT(DISTINCT deviceId) as cnt
>
>  FROM
>
>  (
>
>  SELECT
>
>  'ZL_005' as aggId,
>
>  'ZL_UV_PER_MINUTE' as pageId,
>
>  deviceId,
>
>  ts2Date(recvTime) as statkey
>
>  from
>
>  kafka_zl_etrack_event_stream
>
>  )
>
>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best


Flink SQL Count Distinct performance optimization

2020-01-07 Thread sunfulin
Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated. 


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Flink SQL Count Distinct performance optimization

2020-01-07 Thread sunfulin
Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated. 


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re: Using redis cache in flink

2020-01-07 Thread Navneeth Krishnan
Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I
would still have rocksdb state backend for other states. The reason to use
cache instead of managed state is because I’d get around 10k msgs per task
slot and I don’t have to get the state from rocksdb for each lookup. In
memory cache would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang  wrote:

> Hi Navneeth
>
> If you wrap redis as a state backend, you cannot easily share data across
> slots as Flink construct state backend per operator with local thread only.
>
> If you use a redis cluster as a externalized service to store your data,
> you can share data across slots easily. However, compared with the reduced
> cost of serialization, the introduce of network communicate cannot be
> ignored. There exists trade-off here, and we cannot ensure there would be a
> performance gain. Actually, I prefer the time used in CPU serialization is
> much less than the time consumed through the network.
>
> Best
> Yun Tang
> --
> *From:* Navneeth Krishnan 
> *Sent:* Wednesday, January 8, 2020 12:33
> *To:* user 
> *Subject:* Using redis cache in flink
>
> Hi All,
>
> I want to use redis as near far cache to store data which are common
> across slots i.e. share data across slots. This data is required for
> processing every single message and it's better to store in a in memory
> cache backed by redis rather than rocksdb since it has to be serialized for
> every single get call. Do you guys think this is good solution or is there
> any other better solution? Also, Is there any reference on how I can create
> a centralized near far cache since the job and operators are distributed by
> the job manager.
>
> Thanks
>


Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
感谢 唐老师 解答!

在 2020-01-07 19:46:06,"Yun Tang"  写道:
>Hi
>
>使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
>至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
>而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。
>
>
>[1] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>[2] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
>[3] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119
>
>祝好
>唐云
>
>
>From: USERNAME 
>Sent: Tuesday, January 7, 2020 17:54
>To: user-zh@flink.apache.org 
>Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别
>
>各位好!
>祝大家新年快乐!
>
>
>
>
>--版本
>FLINK 1.9.1 ON YARN
>
>
>--过程
>1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>--问题
>new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>这种计算场景有更好的计算方法吗?
>
>
>--部分代码
>final StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>new ProcessWindowFunction{
>public void process(Tuple tuple, Context context, Iterable 
>elements, Collector out) throws Exception {
>for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>
>iter.remove();
>}
>}
>
>}
>
>
>
>
>
>
>


Re:Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
TTL 好像不支持 TimeCharacteristic.EventTime 方式



在 2020-01-08 14:17:11,"USERNAME"  写道:
>我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。
>
>
>
>
>
>在 2020-01-07 19:51:57,"huoguo"  写道:
>>
>>
>>过期数据能通过TTL 设置过期吗?
>>
>>> 在 2020年1月7日,17:54,USERNAME  写道:
>>> 
>>> 各位好!
>>> 祝大家新年快乐!
>>> 
>>> 
>>> 
>>> 
>>> --版本
>>> FLINK 1.9.1 ON YARN
>>> 
>>> 
>>> --过程
>>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>>> --问题
>>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>>> 这种计算场景有更好的计算方法吗?
>>> 
>>> 
>>> --部分代码
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>> 
>>> new ProcessWindowFunction{
>>> public void process(Tuple tuple, Context context, Iterable 
>>> elements, Collector out) throws Exception {
>>> for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>>> 
>>> iter.remove();
>>> }
>>> }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>


Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread Congxian Qiu
If you want to figure out the performance problem, maybe async-profile[1]
can be helpful
[1] https://github.com/jvm-profiling-tools/async-profiler
Best,
Congxian


William C  于2020年1月8日周三 上午11:37写道:

> Hallo
>
> on 2020/1/8 11:31, RKandoji wrote:
> > I'm running my job on a EC2 instance with 32 cores and according to the
> > documentation I tried to use as many task slots the number of cores,
> > numOfTaskSlots=32 and parallelism=32. But I noticed that the performance
> > is slightly degrading when I'm using 32 task slots. Performance seems
> > better at 26 task slots than >26 task slots. So I was trying to
> > understand if additional CPU cores are being utilized by checkpointing
> > or any other async (or background operations, in the process I was
> > trying to verify if the checkpointing is async.
>
> How did you evaluate the performance?
> It's may due to busy IO or thread competition or something else similiar.
> You'd better dig into more details via logging.
>
> Regards.
>


Re: Using redis cache in flink

2020-01-07 Thread Yun Tang
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots 
as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you 
can share data across slots easily. However, compared with the reduced cost of 
serialization, the introduce of network communicate cannot be ignored. There 
exists trade-off here, and we cannot ensure there would be a performance gain. 
Actually, I prefer the time used in CPU serialization is much less than the 
time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan 
Sent: Wednesday, January 8, 2020 12:33
To: user 
Subject: Using redis cache in flink

Hi All,

I want to use redis as near far cache to store data which are common across 
slots i.e. share data across slots. This data is required for processing every 
single message and it's better to store in a in memory cache backed by redis 
rather than rocksdb since it has to be serialized for every single get call. Do 
you guys think this is good solution or is there any other better solution? 
Also, Is there any reference on how I can create a centralized near far cache 
since the job and operators are distributed by the job manager.

Thanks


Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-07 Thread tison
请问你所说的 1.9 的样式是怎么样的呢?我记得最近有跟 visualizer 相关的讨论,但是没有这个特殊的 issue,你可以直接在 JIRA
上提 issue

Best,
tison.


 于2020年1月8日周三 下午12:56写道:

> 有大佬能解答下吗
>
> -邮件原件-
> 发件人: slle...@aliyun.com.INVALID 
> 发送时间: 2020年1月6日 11:15
> 收件人: user-zh@flink.apache.org
> 主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
>
> 链接地址:https://flink.apache.org/visualizer/index.html
>
>


回复: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-07 Thread sllence
有大佬能解答下吗

-邮件原件-
发件人: slle...@aliyun.com.INVALID  
发送时间: 2020年1月6日 11:15
收件人: user-zh@flink.apache.org
主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

链接地址:https://flink.apache.org/visualizer/index.html



Using redis cache in flink

2020-01-07 Thread Navneeth Krishnan
Hi All,

I want to use redis as near far cache to store data which are common across
slots i.e. share data across slots. This data is required for processing
every single message and it's better to store in a in memory cache backed
by redis rather than rocksdb since it has to be serialized for every single
get call. Do you guys think this is good solution or is there any other
better solution? Also, Is there any reference on how I can create a
centralized near far cache since the job and operators are distributed by
the job manager.

Thanks


回复:jobgraph 生成

2020-01-07 Thread 张江
Very sorry for the wrong operation. I copied the wrong email address by the 
phone.


Thank you for your reply.


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2020年01月08日 11:08,tison 写道:
Hi Zhang,


I just notice that it is sent to user list. Please send to user-zh list(in cc) 
next time if you want to discuss in Chinese.


Best,
tison.




tison  于2020年1月8日周三 上午11:06写道:

如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以


JsonPlanGenerator.generatePlan(jobGraph)


拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。


Best,
tison.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos




张江  于2020年1月8日周三 上午11:01写道:

大家好,


通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink 
visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?


flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?




谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

回复:jobgraph 生成

2020-01-07 Thread 张江
Very sorry for the wrong operation. I copied the wrong email address by the 
phone.


Thank you for your reply.


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2020年01月08日 11:08,tison 写道:
Hi Zhang,


I just notice that it is sent to user list. Please send to user-zh list(in cc) 
next time if you want to discuss in Chinese.


Best,
tison.




tison  于2020年1月8日周三 上午11:06写道:

如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以


JsonPlanGenerator.generatePlan(jobGraph)


拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。


Best,
tison.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos




张江  于2020年1月8日周三 上午11:01写道:

大家好,


通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink 
visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?


flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?




谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

Re: managedMemoryInMB failure

2020-01-07 Thread Xintong Song
Hi Fanbin,

The blink planner batch sql operators requires managed memory, and the
amount of managed memory needed depends on your job. The failure is because
the slot, according to your cluster configurations, does not have enough
managed memory to fulfill the requests.

To fix the problem, you would need to configure more managed memory for
your task executors. You can set the config option
"taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
your case) * numberOfSlots'.

It's not clear to me why the exactly same code works on emr. Were you
running the same version of flink?

Thank you~

Xintong Song



On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu  wrote:

> Hi,
>
> with Flink 1.9 running in docker mode, I have a batch job and got the
> following error message.
>
> However, it works totally fine with the same code on EMR. I checked the
> log and here is the only difference:
> managedMemoryInMB=138 . (the working ones has 0 value)
>
> did anybody see this before?
> Thanks,
> Fanbin
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot
> failed
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:774)
> at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
> CompletableFuture.java:792)
> at java.util.concurrent.CompletableFuture.whenComplete(
> CompletableFuture.java:2153)
> at org.apache.flink.runtime.concurrent.FutureUtils
> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .requestNewAllocatedSlot(SchedulerImpl.java:262)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateMultiTaskSlot(SchedulerImpl.java:542)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSharedSlot(SchedulerImpl.java:341)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .internalAllocateSlot(SchedulerImpl.java:168)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSlotInternal(SchedulerImpl.java:149)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateBatchSlot(SchedulerImpl.java:129)
> at org.apache.flink.runtime.executiongraph.
> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
> SlotProviderStrategy.java:109)
> at org.apache.flink.runtime.executiongraph.Execution
> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
> at java.util.concurrent.CompletableFuture.uniComposeStage(
> CompletableFuture.java:995)
> at java.util.concurrent.CompletableFuture.thenCompose(
> CompletableFuture.java:2137)
> at org.apache.flink.runtime.executiongraph.Execution
> .allocateAndAssignSlotForExecution(Execution.java:554)
> at org.apache.flink.runtime.executiongraph.Execution
> .allocateResourcesForExecution(Execution.java:496)
> at org.apache.flink.runtime.executiongraph.Execution
> .scheduleForExecution(Execution.java:439)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex
> .scheduleForExecution(ExecutionVertex.java:674)
> at org.apache.flink.runtime.executiongraph.Execution.scheduleConsumer(
> Execution.java:850)
> at org.apache.flink.runtime.executiongraph.Execution
> .scheduleOrUpdateConsumers(Execution.java:887)
> at org.apache.flink.runtime.executiongraph.Execution.markFinished(
> Execution.java:1064)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph
> .updateStateInternal(ExecutionGraph.java:1548)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(
> ExecutionGraph.java:1521)
> at org.apache.flink.runtime.scheduler.LegacyScheduler
> .updateTaskExecutionState(LegacyScheduler.java:289)
> at org.apache.flink.runtime.jobmaster.JobMaster
> .updateTaskExecutionState(JobMaster.java:377)
> at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:279)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
> AkkaRpcActor.java:194)
> at 

Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread William C

Hallo

on 2020/1/8 11:31, RKandoji wrote:
I'm running my job on a EC2 instance with 32 cores and according to the 
documentation I tried to use as many task slots the number of cores, 
numOfTaskSlots=32 and parallelism=32. But I noticed that the performance 
is slightly degrading when I'm using 32 task slots. Performance seems 
better at 26 task slots than >26 task slots. So I was trying to 
understand if additional CPU cores are being utilized by checkpointing 
or any other async (or background operations, in the process I was 
trying to verify if the checkpointing is async.


How did you evaluate the performance?
It's may due to busy IO or thread competition or something else similiar.
You'd better dig into more details via logging.

Regards.


Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread RKandoji
Sorry for not providing the context earlier.

I'm running my job on a EC2 instance with 32 cores and according to the
documentation I tried to use as many task slots the number of cores,
numOfTaskSlots=32 and parallelism=32. But I noticed that the performance is
slightly degrading when I'm using 32 task slots. Performance seems better
at 26 task slots than >26 task slots. So I was trying to understand if
additional CPU cores are being utilized by checkpointing or any other async
(or background operations, in the process I was trying to verify if
the checkpointing is async.

Thanks,
RKandoji



On Tue, Jan 7, 2020 at 10:16 PM Congxian Qiu  wrote:

> Hi
>
> RocksDB supports Incremental and full snapshot, both are async, do you
> want to verify whether it's incremental or full snapshot? but I don't know
> an easy to get this information currently
>
> Best,
> Congxian
>
>
> Zhijiang  于2020年1月8日周三 上午10:56写道:
>
>> The log way is simple for tracing and you can also grep some keywords to
>> find your requirement messages to avoid skimming through the whole large
>> logs.
>> I am not quite sure what's your specific motivation for doing this.
>> Besides the log way, you can also monitor the thread stack for confirming
>> whether it is happening, but maybe it is not very convenient.
>> Another possible way is via the checkpoint metrics which would record the
>> sync/async duration time, maybe it can also satisfy your requirements.
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:RKandoji 
>> Send Time:2020 Jan. 8 (Wed.) 10:23
>> To:William C 
>> Cc:user 
>> Subject:Re: How to verify if checkpoints are asynchronous or sync
>>
>> Thanks for the reply.
>> I will check and enable debug logs specifically for the class that
>> contains this log.
>> But in general logs are already too huge and I'm trying to suppress some
>> of them, so wondering if there is any other way?
>>
>> Thanks,
>> RKandoji
>>
>>
>> On Tue, Jan 7, 2020 at 7:50 PM William C  wrote:
>> Can you enable debug log to check with that?
>>
>> regards.
>>
>> on 2020/1/8 6:36, RKandoji wrote:
>> > But I'm curious if there is way to verify if the checkpoints are
>> > happening asynchronously or synchronously.
>> >
>>
>>
>>


Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread Congxian Qiu
Hi

RocksDB supports Incremental and full snapshot, both are async, do you want
to verify whether it's incremental or full snapshot? but I don't know an
easy to get this information currently

Best,
Congxian


Zhijiang  于2020年1月8日周三 上午10:56写道:

> The log way is simple for tracing and you can also grep some keywords to
> find your requirement messages to avoid skimming through the whole large
> logs.
> I am not quite sure what's your specific motivation for doing this.
> Besides the log way, you can also monitor the thread stack for confirming
> whether it is happening, but maybe it is not very convenient.
> Another possible way is via the checkpoint metrics which would record the
> sync/async duration time, maybe it can also satisfy your requirements.
>
> Best,
> Zhijiang
>
> --
> From:RKandoji 
> Send Time:2020 Jan. 8 (Wed.) 10:23
> To:William C 
> Cc:user 
> Subject:Re: How to verify if checkpoints are asynchronous or sync
>
> Thanks for the reply.
> I will check and enable debug logs specifically for the class that
> contains this log.
> But in general logs are already too huge and I'm trying to suppress some
> of them, so wondering if there is any other way?
>
> Thanks,
> RKandoji
>
>
> On Tue, Jan 7, 2020 at 7:50 PM William C  wrote:
> Can you enable debug log to check with that?
>
> regards.
>
> on 2020/1/8 6:36, RKandoji wrote:
> > But I'm curious if there is way to verify if the checkpoints are
> > happening asynchronously or synchronously.
> >
>
>
>


Re: jobgraph 生成

2020-01-07 Thread tison
A public way to get JSON plan of a JobGraph is, with an existing Flink
Cluster, use REST API JarPlan[1].

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jars-jarid-plan


tison  于2020年1月8日周三 上午11:08写道:

> Hi Zhang,
>
> I just notice that it is sent to user list. Please send to user-zh list(in
> cc) next time if you want to discuss in Chinese.
>
> Best,
> tison.
>
>
> tison  于2020年1月8日周三 上午11:06写道:
>
>> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>>
>> JsonPlanGenerator.generatePlan(jobGraph)
>>
>> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>>
>> Best,
>> tison.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>>
>>
>> 张江  于2020年1月8日周三 上午11:01写道:
>>
>>> 大家好,
>>>
>>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>>
>>> flink里似乎没有直接的API可以调用,但是我在flink web
>>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>>
>>>
>>> 谢谢
>>>
>>> 张江
>>> 邮箱:zjkingdom2...@163.com
>>>
>>> 
>>>
>>> 签名由 网易邮箱大师  定制
>>>
>>


Re: jobgraph 生成

2020-01-07 Thread tison
A public way to get JSON plan of a JobGraph is, with an existing Flink
Cluster, use REST API JarPlan[1].

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jars-jarid-plan


tison  于2020年1月8日周三 上午11:08写道:

> Hi Zhang,
>
> I just notice that it is sent to user list. Please send to user-zh list(in
> cc) next time if you want to discuss in Chinese.
>
> Best,
> tison.
>
>
> tison  于2020年1月8日周三 上午11:06写道:
>
>> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>>
>> JsonPlanGenerator.generatePlan(jobGraph)
>>
>> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>>
>> Best,
>> tison.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>>
>>
>> 张江  于2020年1月8日周三 上午11:01写道:
>>
>>> 大家好,
>>>
>>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>>
>>> flink里似乎没有直接的API可以调用,但是我在flink web
>>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>>
>>>
>>> 谢谢
>>>
>>> 张江
>>> 邮箱:zjkingdom2...@163.com
>>>
>>> 
>>>
>>> 签名由 网易邮箱大师  定制
>>>
>>


Re: jobgraph 生成

2020-01-07 Thread tison
Hi Zhang,

I just notice that it is sent to user list. Please send to user-zh list(in
cc) next time if you want to discuss in Chinese.

Best,
tison.


tison  于2020年1月8日周三 上午11:06写道:

> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>
> JsonPlanGenerator.generatePlan(jobGraph)
>
> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>
> Best,
> tison.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>
>
> 张江  于2020年1月8日周三 上午11:01写道:
>
>> 大家好,
>>
>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>
>> flink里似乎没有直接的API可以调用,但是我在flink web
>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>
>>
>> 谢谢
>>
>> 张江
>> 邮箱:zjkingdom2...@163.com
>>
>> 
>>
>> 签名由 网易邮箱大师  定制
>>
>


Re: jobgraph 生成

2020-01-07 Thread tison
Hi Zhang,

I just notice that it is sent to user list. Please send to user-zh list(in
cc) next time if you want to discuss in Chinese.

Best,
tison.


tison  于2020年1月8日周三 上午11:06写道:

> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>
> JsonPlanGenerator.generatePlan(jobGraph)
>
> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>
> Best,
> tison.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>
>
> 张江  于2020年1月8日周三 上午11:01写道:
>
>> 大家好,
>>
>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>
>> flink里似乎没有直接的API可以调用,但是我在flink web
>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>
>>
>> 谢谢
>>
>> 张江
>> 邮箱:zjkingdom2...@163.com
>>
>> 
>>
>> 签名由 网易邮箱大师  定制
>>
>


Re: jobgraph 生成

2020-01-07 Thread tison
如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以

JsonPlanGenerator.generatePlan(jobGraph)

拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos


张江  于2020年1月8日周三 上午11:01写道:

> 大家好,
>
> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>
> flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>
>
> 谢谢
>
> 张江
> 邮箱:zjkingdom2...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>


jobgraph 生成

2020-01-07 Thread 张江
大家好,


通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink 
visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?


flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?




谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread Zhijiang
The log way is simple for tracing and you can also grep some keywords to find 
your requirement messages to avoid skimming through the whole large logs.
I am not quite sure what's your specific motivation for doing this. Besides the 
log way, you can also monitor the thread stack for confirming whether it is 
happening, but maybe it is not very convenient.
Another possible way is via the checkpoint metrics which would record the 
sync/async duration time, maybe it can also satisfy your requirements.

Best,
Zhijiang 


--
From:RKandoji 
Send Time:2020 Jan. 8 (Wed.) 10:23
To:William C 
Cc:user 
Subject:Re: How to verify if checkpoints are asynchronous or sync

Thanks for the reply.
I will check and enable debug logs specifically for the class that contains 
this log.
But in general logs are already too huge and I'm trying to suppress some of 
them, so wondering if there is any other way?

Thanks,
RKandoji


On Tue, Jan 7, 2020 at 7:50 PM William C  wrote:
Can you enable debug log to check with that?

 regards.

 on 2020/1/8 6:36, RKandoji wrote:
 > But I'm curious if there is way to verify if the checkpoints are 
 > happening asynchronously or synchronously.
 > 



Re: 注册table时catalog无法变更

2020-01-07 Thread JingsongLee
Hi xiyueha,

你可以用TableEnv.sqlUpdate("create table ...")的DDL的方式,这会注册到当前catalog中。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2020年1月8日(星期三) 09:17
To:user-zh 
Cc:xiyueha 
Subject:Re: 注册table时catalog无法变更

临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread RKandoji
Thanks for the reply.
I will check and enable debug logs specifically for the class that contains
this log.
But in general logs are already too huge and I'm trying to suppress some of
them, so wondering if there is any other way?

Thanks,
RKandoji


On Tue, Jan 7, 2020 at 7:50 PM William C  wrote:

> Can you enable debug log to check with that?
>
> regards.
>
> on 2020/1/8 6:36, RKandoji wrote:
> > But I'm curious if there is way to verify if the checkpoints are
> > happening asynchronously or synchronously.
> >
>


Re: 注册table时catalog无法变更

2020-01-07 Thread Kurt Young
临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread William C

Can you enable debug log to check with that?

regards.

on 2020/1/8 6:36, RKandoji wrote:
But I'm curious if there is way to verify if the checkpoints are 
happening asynchronously or synchronously.




managedMemoryInMB failure

2020-01-07 Thread Fanbin Bu
Hi,

with Flink 1.9 running in docker mode, I have a batch job and got the
following error message.

However, it works totally fine with the same code on EMR. I checked the log
and here is the only difference:
managedMemoryInMB=138 . (the working ones has 0 value)

did anybody see this before?
Thanks,
Fanbin


org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
No pooled slot available and request to ResourceManager for new slot failed
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
at java.util.concurrent.CompletableFuture.uniWhenComplete(
CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
CompletableFuture.java:792)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture
.java:2153)
at org.apache.flink.runtime.concurrent.FutureUtils
.whenCompleteAsyncIfNotDone(FutureUtils.java:940)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.requestSlotFromResourceManager(SlotPoolImpl.java:339)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.requestNewAllocatedSlot(SchedulerImpl.java:262)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateMultiTaskSlot(SchedulerImpl.java:542)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateSharedSlot(SchedulerImpl.java:341)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.internalAllocateSlot(SchedulerImpl.java:168)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateSlotInternal(SchedulerImpl.java:149)
at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateBatchSlot(SchedulerImpl.java:129)
at org.apache.flink.runtime.executiongraph.
SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
SlotProviderStrategy.java:109)
at org.apache.flink.runtime.executiongraph.Execution
.lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
at java.util.concurrent.CompletableFuture.uniComposeStage(
CompletableFuture.java:995)
at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture
.java:2137)
at org.apache.flink.runtime.executiongraph.Execution
.allocateAndAssignSlotForExecution(Execution.java:554)
at org.apache.flink.runtime.executiongraph.Execution
.allocateResourcesForExecution(Execution.java:496)
at org.apache.flink.runtime.executiongraph.Execution
.scheduleForExecution(Execution.java:439)
at org.apache.flink.runtime.executiongraph.ExecutionVertex
.scheduleForExecution(ExecutionVertex.java:674)
at org.apache.flink.runtime.executiongraph.Execution.scheduleConsumer(
Execution.java:850)
at org.apache.flink.runtime.executiongraph.Execution
.scheduleOrUpdateConsumers(Execution.java:887)
at org.apache.flink.runtime.executiongraph.Execution.markFinished(
Execution.java:1064)
at org.apache.flink.runtime.executiongraph.ExecutionGraph
.updateStateInternal(ExecutionGraph.java:1548)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(
ExecutionGraph.java:1521)
at org.apache.flink.runtime.scheduler.LegacyScheduler
.updateTaskExecutionState(LegacyScheduler.java:289)
at org.apache.flink.runtime.jobmaster.JobMaster
.updateTaskExecutionState(JobMaster.java:377)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
AkkaRpcActor.java:279)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:194)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at 

Re: flink sql confluent schema avro topic注册成表

2020-01-07 Thread Bowen Li
Hi 陈帅,

这是一个非常合理的需求。我们需要开发一个 Flink ConfluentSchemaRegistryCatalog
完成元数据的获取。社区希望的用户体验是用户只需要给出confluent schema registry的链接,Flink SQL可以通过
ConfluentSchemaRegistryCatalog自动获取读写所需的信息,不再需要用户手动写DDL和format。

社区内部已经开始讨论了,我们应该会在1.11中完成,请关注
https://issues.apache.org/jira/browse/FLINK-12256


On Wed, Dec 18, 2019 at 6:46 AM 陈帅  wrote:

> 谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢?
>
> 朱广彬  于2019年12月18日周三 上午10:30写道:
>
> > Hi 陈帅,
> >
> > 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro
> > schema的管理,所以,我们改动了flink-avro 的源码来支持。
> >
> > 主要涉及到这些地方:
> >
> >
> org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema}
> > 和org.apache.flink.table.descriptors.{Avro,AvroValidator}
> >
> > 使用时在构建Avro时指定以下三个参数即可(见标红部分):
> >
> > tableEnv.connect(
> > new Kafka()
> > .version("universal")
> > .topic(topic)
> > .properties(props)
> > ).withFormat(
> > new Avro()
> >   .useRegistry(true)
> >   .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS)
> >   .registrySubject(subject)
> >   .avroSchema(avroSchemaStr)
> > )
> >
> >
> > 陈帅  于2019年12月18日周三 上午8:26写道:
> > >
> > > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式
> 的topic注册成一张table?
> >
>


Re: Duplicate tasks for the same query

2020-01-07 Thread RKandoji
hi Kurt,

Thanks for the additional info.

RK

On Sun, Jan 5, 2020 at 8:33 PM Kurt Young  wrote:

> Another common skew case we've seen is null handling, the value of the
> join key
> is NULL. We will shuffle the NULL value into one task even if the join
> condition
> won't stand by definition.
>
> For DeDuplication, I just want to make sure this behavior meets your
> requirement.
> Because for some other usages, users might be only interested with the
> earliest
> records because the updating for the same key is purely redundant, like
> caused by
> upstream failure and process the same data again. In that case, each key
> will only have
> at most one record and you won't face any join key skewing issue.
>
> Best,
> Kurt
>
>
> On Mon, Jan 6, 2020 at 6:55 AM RKandoji  wrote:
>
>> Hi Kurt,
>>
>> I understand what you mean, some userIds may appear more frequently than
>> the others but this distribution doesn't look in proportionate with the
>> data skew. Do you think of any other possible reasons or anything I can try
>> out to investigate this more?
>>
>> For DeDuplication, I query for the latest record. Sorry I didn't follow
>> above sentence, do you mean that for each update to user table the
>> record(s) that were updated will be sent via retract stream.I think that's
>> expected as I need to process latest records, as long as it is sending only
>> the record(s) that's been updated.
>>
>> Thanks,
>> RKandoji
>>
>> On Fri, Jan 3, 2020 at 9:57 PM Kurt Young  wrote:
>>
>>> Hi RKandoji,
>>>
>>> It looks like you have a data skew issue with your input data. Some or
>>> maybe only one "userId" appears more frequent than others. For join
>>> operator to work correctly, Flink will apply "shuffle by join key"
>>> before the
>>> operator, so same "userId" will go to the same sub-task to perform join
>>> operation. In this case, I'm afraid there is nothing much you can do for
>>> now.
>>>
>>> BTW, for the DeDuplicate, do you keep the latest record or the earliest?
>>> If
>>> you keep the latest version, Flink will tigger retraction and then send
>>> the latest
>>> record again every time when your user table changes.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:
>>>
 Hi,

 Thanks a ton for the help with earlier questions, I updated code to
 version 1.9 and started using Blink Planner (DeDuplication). This is
 working as expected!

 I have a new question, but thought of asking in the same email chain as
 this has more context about my use case etc.

 Workflow:
 Currently I'm reading from a couple of Kafka topics, DeDuplicating the
 input data, performing JOINs and writing the joined data to another Kafka
 topic.

 Issue:
 I set Parallelism to 8 and on analyzing the subtasks found that the
 data is not distributed well among 8 parallel tasks for the last Join
 query. One of a subtask is taking huge load, whereas others taking pretty
 low load.

 Tried a couple of things below, but no use. Not sure if they are
 actually related to the problem as I couldn't yet understand what's the
 issue here.
 1. increasing the number of partitions of output Kafka topic.
 2. tried adding keys to output so key partitioning happens at Kafka end.

 Below is a snapshot for reference:
 [image: image.png]

 Below are the config changes I made:

 taskmanager.numberOfTaskSlots: 8
 parallelism.default: 8
 jobmanager.heap.size: 5000m
 taskmanager.heap.size: 5000m
 state.backend: rocksdb
 state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
 state.backend.incremental: true

 I don't see any errors and job seems to be running smoothly (and
 slowly). I need to make it distribute the load well for faster processing,
 any pointers on what could be wrong and how to fix it would be very 
 helpful.

 Thanks,
 RKandoji


 On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:

> Thanks!
>
> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
> wrote:
>
>> Yes,
>>
>> 1.9.2 or Coming soon 1.10
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>
>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>
>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>>> wrote:
>>>
 Blink planner was introduced in 1.9. We recommend use blink planner
 after 1.9.
 After some bug fix, I think the latest version of 1.9 is OK. The
 production environment has also been set up in some places.

 Best,
 Jingsong Lee

 On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:

> Thanks Jingsong and Kurt for more details.
>
> Yes, I'm planning to try out DeDuplication when I'm done upgrading
> to version 1.9. Hopefully 

How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread RKandoji
Hi,

I'm using Flink 1.9, BlinkPlanner and rocksDB for backend with
checkpointing enabled.

I understand that checkpointing is async by default for RocksDB. But I'm
curious if there is way to verify if the checkpoints are happening
asynchronously or synchronously.

Please let me know.

Thanks,
RKandoji


Re: Abstract classes in Stateful functions

2020-01-07 Thread Dan Pettersson
Thanks!

Den tis 7 jan. 2020 12:49Igal Shilman  skrev:

> Hi Dan,
> Yes. You should be able to store “wheels” in “Vehicle”.
>
> Igal.
>
> On Monday, January 6, 2020, Dan Pettersson 
> wrote:
>
>> Hello again :-)
>>
>> When using an abstract class should the instance variables be stored in
>> PersistenceValues
>> to conform to the fault tolerance?
>>
>> Exempel Car, Truck and Abstract Vehicle, should wheels in Vehicle be
>> stored in a PersistenceValue?
>>
>> Thanks,
>>
>> /Dan
>>
>


Re: Flink logging issue with logback

2020-01-07 Thread Dawid Wysakowicz
A quick update. The suppression of stdout/stderr actually might soon be
dropped, see: https://issues.apache.org/jira/browse/FLINK-15504

Best,

Dawid

On 07/01/2020 07:17, Yang Wang wrote:
> Hi Bajaj,
>
> I have tested just as you say, and find that the logs in the user
> class could not show up when
> using ConsoleAppender. If using FileAppender instead, everything goes
> well. 
>
> It is so weird and i have no idea how to debug it.
> Best,
> Yang
>
> Bajaj, Abhinav  > 于2020年1月7日周二 上午4:28写道:
>
> Hi,
>
>  
>
> Thanks much for the responses.
>
> Let me add some more details and clarify my question.
>
>  
>
> _Setup_
>
>   * I used the WikipediaAnalysis example and added a log in main
> method.
>
> ……
>
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment see =
> StreamExecutionEnvironment./getExecutionEnvironment/();
>   /LOG/.info("Info log for test");
>
>   DataStream edits = see.addSource(new
> WikipediaEditsSource());
>
> ……
>
>   * I am using the Flink 1.7.1 distribution and starting
> jobmanager and taskmanager locally using the below commands –
>   o ./bin/jobmanager.sh start-foreground
>   o ./bin/taskmanager.sh start-foreground
>   o Both jobmanager and taskmanager log in the console now
>   o JVM options are correctly set and verified from jobmanager
> & taskmanager logs
>
>  
>
>   * I submit the WikipediaAnalysis job from Flink dashboard and
> checked the jobmanager logs
>
>  
>
> _Run 1_: Flink is using the default log4j logging
>
>   * Jobmanager logs the added info log from the job
>   o 2020-01-06 11:55:37,422 INFO 
> wikiedits.WikipediaAnalysis  
> - Info log for test
>
>  
>
> _Run 2_: Flink is setup to use logback as suggested in Flink
> documentation here
> 
> 
>
>   * Jobmanger does not log the added info log from the job
>
>  
>
> So, it seems there is a logging behavior difference between using
> log4j & logback in Flink.
>
> Is this expected or a known difference?
>
>  
>
> Thanks again,
>
> Abhinav Bajaj
>
>  
>
>  
>
> _PS_: Ahh. I see how my email was confusing the first time.
> Hopefully this one is better :P
>
>  
>
>  
>
> *From: *Dawid Wysakowicz  >
> *Date: *Monday, January 6, 2020 at 5:13 AM
> *Cc: *"Bajaj, Abhinav"  >, "user@flink.apache.org
> "  >
> *Subject: *Re: Flink logging issue with logback
>
>  
>
> Hi Bajaj,
>
> I am not entirely sure what is the actual issue you are seeking
> help, but let me comment on your observations.
>
> Ad. 1
>
> If you log to the console from the main method this is an expected
> behavior in both cases (log4j, logback). The std out is being
> overwritten for the execution of the main method
>
> If you log to a file logs should work in both cases. I checked
> that myself and actually the logs appeared in the jobmanager logs
> as long as they are executed before the env.execute(). I observed
> though weird behavior of the Web UI, as it does not always update
> the logs that are displayed. How did you check the logs? If you
> checked through the Web UI could you try to check the file directly?
>
> Ad. 2 Yes this is expected. Operators are executed on taskmanager
> and that's why they log there.
>
> Ad. 3 See Ad. 1
>
> Best,
>
> Dawid
>
>  
>
> On 06/01/2020 07:07, vino yang wrote:
>
> Hi Bajaj,
>
>  
>
> >> Logs from main method(outside of job graph) do not show up
> in jobmanager logs.
>
>  
>
> IMO, it's normal phenomena.
>
>  
>
> Other ideas, please check the JVM options mentioned by Yang.
>
>  
>
> Best,
>
> Vino
>
>  
>
>  
>
> Yang Wang  > 于2020年1月6日周一 上午11:18写道:
>
> Hi Bajaj, Abhinav,
>
>  
>
> Could you share the start-command of jobmanager and
> taskmanager. If it is started correctly, we
>
> will have a the following jvm options.
>
>  
>
> -Dlog.file=/path/of/taskmanager.log
> -Dlogback.configurationFile=file:///path/of/logback.xml
>
>  
>
>  
>
>  
>
> Best,
>
> Yang
>
>  
>
> Bajaj, Abhinav  > 于2020年1月4日周六
> 上午7:23写道:
>
> Hi,
>
>   

Re: Flink logging issue with logback

2020-01-07 Thread Dawid Wysakowicz
Hi Bajaj,

I spent a bit more time on that and I figured out the difference. The
cause is in the implementation of log4j's ConsoleAppender.

So first of all as I said before, the code path that submits a job
through Web UI redirects the std.out and std.err to a memory buffer for
the time of the "main" method execution. This is the expected behavior.
That's why the logback does not log anything. Log4j implementation is
slightly different as by default it does not follow changes to the
System#setOut. It buffers the original System.out. It has an option
though to also follow changes to the System.out: ConsoleAppender#follow
(https://logging.apache.org/log4j/2.x/manual/appenders.html). If enabled
the log4j behaves exactly the same way as logback. After a very brief
check I could not find such property for logback (actually I like
logback's approach better).

Hope it clarifies the difference.

Best,

Dawid

On 07/01/2020 07:17, Yang Wang wrote:
> Hi Bajaj,
>
> I have tested just as you say, and find that the logs in the user
> class could not show up when
> using ConsoleAppender. If using FileAppender instead, everything goes
> well. 
>
> It is so weird and i have no idea how to debug it.
> Best,
> Yang
>
> Bajaj, Abhinav  > 于2020年1月7日周二 上午4:28写道:
>
> Hi,
>
>  
>
> Thanks much for the responses.
>
> Let me add some more details and clarify my question.
>
>  
>
> _Setup_
>
>   * I used the WikipediaAnalysis example and added a log in main
> method.
>
> ……
>
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment see =
> StreamExecutionEnvironment./getExecutionEnvironment/();
>   /LOG/.info("Info log for test");
>
>   DataStream edits = see.addSource(new
> WikipediaEditsSource());
>
> ……
>
>   * I am using the Flink 1.7.1 distribution and starting
> jobmanager and taskmanager locally using the below commands –
>   o ./bin/jobmanager.sh start-foreground
>   o ./bin/taskmanager.sh start-foreground
>   o Both jobmanager and taskmanager log in the console now
>   o JVM options are correctly set and verified from jobmanager
> & taskmanager logs
>
>  
>
>   * I submit the WikipediaAnalysis job from Flink dashboard and
> checked the jobmanager logs
>
>  
>
> _Run 1_: Flink is using the default log4j logging
>
>   * Jobmanager logs the added info log from the job
>   o 2020-01-06 11:55:37,422 INFO 
> wikiedits.WikipediaAnalysis  
> - Info log for test
>
>  
>
> _Run 2_: Flink is setup to use logback as suggested in Flink
> documentation here
> 
> 
>
>   * Jobmanger does not log the added info log from the job
>
>  
>
> So, it seems there is a logging behavior difference between using
> log4j & logback in Flink.
>
> Is this expected or a known difference?
>
>  
>
> Thanks again,
>
> Abhinav Bajaj
>
>  
>
>  
>
> _PS_: Ahh. I see how my email was confusing the first time.
> Hopefully this one is better :P
>
>  
>
>  
>
> *From: *Dawid Wysakowicz  >
> *Date: *Monday, January 6, 2020 at 5:13 AM
> *Cc: *"Bajaj, Abhinav"  >, "user@flink.apache.org
> "  >
> *Subject: *Re: Flink logging issue with logback
>
>  
>
> Hi Bajaj,
>
> I am not entirely sure what is the actual issue you are seeking
> help, but let me comment on your observations.
>
> Ad. 1
>
> If you log to the console from the main method this is an expected
> behavior in both cases (log4j, logback). The std out is being
> overwritten for the execution of the main method
>
> If you log to a file logs should work in both cases. I checked
> that myself and actually the logs appeared in the jobmanager logs
> as long as they are executed before the env.execute(). I observed
> though weird behavior of the Web UI, as it does not always update
> the logs that are displayed. How did you check the logs? If you
> checked through the Web UI could you try to check the file directly?
>
> Ad. 2 Yes this is expected. Operators are executed on taskmanager
> and that's why they log there.
>
> Ad. 3 See Ad. 1
>
> Best,
>
> Dawid
>
>  
>
> On 06/01/2020 07:07, vino yang wrote:
>
> Hi Bajaj,
>
>  
>
> >> Logs from main method(outside of job graph) do not show up
> in jobmanager logs.
>
>  
>
> IMO, it's normal phenomena.
>
>  
>
> Other ideas, please check the JVM options mentioned by Yang.
>
>  
>
> Best,

Re: Operators resource requirements on K8s Flink session cluster

2020-01-07 Thread Michaël Melchiore
Hi Yang,

Thanks for your quick reply.

The Flink K8s documentation distinguishes between standalone and session
deployment mode. I think I will use the latter.
Since my previous mail, I found FLIP-53

which
is precisely the topic of my original question.

So, great progress has been already made to cover my needs. Unfortunately,
I am use DataStreams API which are currently not covered by the initial
implementation. I have asked on the dev mailing list if I could help
bridging this gap.

Regards,

Michaël

Le jeu. 19 déc. 2019 à 04:58, Yang Wang  a écrit :

> Hi Michaël,
>
> Glad to hear that you are going to run Flink workload on Kubernetes.
> AFAIK, we have two
> deployment ways.
> 1. Running Flink standalone session/per-job cluster on K8s. You need to
> calculate how many
> taskmanagers you need and the  per taskmanager. All the
> taskmanager
> will be started by a K8s deployment. You could find more information
> here[1]. In this mode,
> you could be `kubectl scale` to change the replicas of taskmanager if the
> resources are not
> enough for your job.
> 2. Natively running Flink session/per-job on K8s. The session mode has
> been support in
> master branch and will be released in 1.10. The per-job mode is in
> discussion. No matter
> session or per-job, the taskmanager will be allocated dynamically on
> demand. You could
> use a simple command to start a Flink cluster on K8s. More information
> could be found
> here[2].
>
>
> Best,
> Yang
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> [2].
> https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing
>
>
> Michaël Melchiore  于2019年12月19日周四 上午1:11写道:
>
>> Hello,
>>
>> I plan to run topologies on a Flink session cluster on Kubernetes.
>> In my topologies, operators will have varying resource requirements in
>> term of CPU and RAM.
>> How can I make these informations available from Flink to Kubernetes so
>> the latter takes it into account to optimize its deployment ?
>>
>> I am trying to achieve something similar to Apache Storm/Trident Resource
>> Aware Scheduler
>> .
>>
>> Kind regards,
>>
>> Michaël
>>
>


Re: 注册table时catalog无法变更

2020-01-07 Thread 贺小令
hi,

streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
注册的表都是Temporary Table。

你可以通过:
catalog = new InMemoryExternalCatalog(catalogName);
streamTableEnvironment.registerCatalog(catalogName, catalog);
catalog.createTable()

或者
streamTableEnvironment.getCatalog().get().createTable()

的方式来注册表到指定的catalog


xiyu...@163.com  于2020年1月7日周二 下午3:20写道:

> hi,各位:
>
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> streamTableEnvironment.registerDataStream(tableName, dataStream,
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
>   streamTableEnvironment.registerCatalog(catalogName, new
> InMemoryExternalCatalog(catalogName));
> streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
>
>
> xiyu...@163.com
>


Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread huoguo



过期数据能通过TTL 设置过期吗?

> 在 2020年1月7日,17:54,USERNAME  写道:
> 
> 各位好!
> 祝大家新年快乐!
> 
> 
> 
> 
> --版本
> FLINK 1.9.1 ON YARN
> 
> 
> --过程
> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
> --问题
> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
> 这种计算场景有更好的计算方法吗?
> 
> 
> --部分代码
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> 
> new ProcessWindowFunction{
> public void process(Tuple tuple, Context context, Iterable 
> elements, Collector out) throws Exception {
> for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
> 
> iter.remove();
> }
> }
> 
> }
> 
> 
> 
> 
> 
> 
> 




Abstract classes in Stateful functions

2020-01-07 Thread Igal Shilman
Hi Dan,
Yes. You should be able to store “wheels” in “Vehicle”.

Igal.

On Monday, January 6, 2020, Dan Pettersson 
wrote:

> Hello again :-)
>
> When using an abstract class should the instance variables be stored in
> PersistenceValues
> to conform to the fault tolerance?
>
> Exempel Car, Truck and Abstract Vehicle, should wheels in Vehicle be
> stored in a PersistenceValue?
>
> Thanks,
>
> /Dan
>


Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread Yun Tang
Hi

使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。


[1] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
[2] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
[3] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119

祝好
唐云


From: USERNAME 
Sent: Tuesday, January 7, 2020 17:54
To: user-zh@flink.apache.org 
Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别

各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable 
elements, Collector out) throws Exception {
for (Iterator iter = elements.iterator(); iter.hasNext(); ) {

iter.remove();
}
}

}









FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable 
elements, Collector out) throws Exception {
for (Iterator iter = elements.iterator(); iter.hasNext(); ) {

iter.remove();
}
}

}