Limit max cpu usage per TaskManager

2019-11-05 Thread Lu Niu
Hi,

When run flink application in yarn mode, is there a way to limit maximum
cpu usage per TaskManager?

I tried this application with just source and sink operator. parallelism of
source is 60 and parallelism of sink is 1. When running in default config,
there are 60 TaskManager assigned. I notice one TaskManager process cpu
usage could be 200% white the rest below 50%.

When I set -yn = 2 (default is 1), I notice # of TaskManger dropped down to
30. and one TaskManger process cpu usage could be 600% while the rest below
50%.

Tried to set yarn.containers.vcores = 2,  all tasks are in start state
forever, application is not able to turn to running state.

Best
Lu


Partitioning based on key flink kafka sink

2019-11-05 Thread Vishwas Siravara
Hi all,
I am using flink 1.7.0 and using this constructor

FlinkKafkaProducer(String topicId, KeyedSerializationSchema
serializationSchema, Properties producerConfig)

>From the doc it says this constructor uses fixed partitioner. I want to
partition based on key , so I tried to use this

public FlinkKafkaProducer(
   String defaultTopicId,
   KeyedSerializationSchema serializationSchema,
   Properties producerConfig,
   Optional> customPartitioner)

What should I pass in the optional field ? From the doc it says

@param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
*  If a partitioner is not provided, records
will be partitioned by the key of each record
*  (determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If the keys
*  are {@code null}, then records will be
distributed to Kafka partitions in a
*  round-robin fashion.

This is super confusing(contradicting in a way) since the previous
constructor says that fixedpartitioner will be used if customPartioner is
not present.

Best,
Vishwas


Re: Mac操作系统下Ask timed out问題

2019-11-05 Thread jeff kit
执行命令1的时候,localhost:8081看上去是正常的,有可用的TaskManager,JobSlot等。log下的StandaloneSession和TaskExecutor的log均无异常。
dispatcher也是正常起来的:

2019-11-03 21:07:58,438 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest
endpoint listening at localhost:8081

2019-11-03 21:07:58,438 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
http://localhost:8081 was granted leadership with
leaderSessionID=----

2019-11-03 21:07:58,439 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web
frontend listening at http://localhost:8081.

2019-11-03 21:07:58,507 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
RPC endpoint for
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
akka://flink/user/resourcemanager .

2019-11-03 21:07:58,526 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
at akka://flink/user/dispatcher .

2019-11-03 21:07:58,549 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
ResourceManager akka.tcp://flink@localhost:6123/user/resourcemanager was
granted leadership with fencing token 

2019-11-03 21:07:58,550 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
Starting the SlotManager.

2019-11-03 21:07:58,563 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
akka.tcp://flink@localhost:6123/user/dispatcher was granted leadership with
fencing token ----

2019-11-03 21:07:58,565 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Recovering
all persisted jobs.

2019-11-03 21:08:02,248 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Registering TaskManager with ResourceID ba0633856bb7670711e9e4e946168800
(akka.tcp://flink@jeff-kit-3.local:54796/user/taskmanager_0) at
ResourceManager

On Tue, Nov 5, 2019 at 8:21 PM zhisheng  wrote:

> 执行命令 1 的时候,建议打开 localhost:8081 看看是否 Flink UI 打开是否有问题,再或者看看 Flink log 目录下的
> jobmanager 和 taskmanager 的日志是否有异常?
>
> 我本地测试你同样的命令是 OK 的
>
> tison  于2019年11月5日周二 下午3:01写道:
>
> > 这个问题其实还挺常见的,问题有很多种可能。比如你看一下 log 下面 cluster 的日志看看
> > Dispatcher 有没有正常的起起来,flink-conf 你有没有改过导致超时过短(比如 1 ms)或者
> > 资源不够。也有升级 jdk 小版本后就不复现的。
> >
> > Best,
> > tison.
> >
> >
> > jeff kit  于2019年11月5日周二 下午2:43写道:
> >
> > > 你好。
> > > 我本地的Flink是官网提供的Binary包,非自己编译的。
> > > 我相信我的情况是少数,绝大多数人的Mac都是能跑的。
> > >
> > > On Tue, Nov 5, 2019 at 2:24 PM Biao Liu  wrote:
> > >
> > > > 你好,
> > > >
> > > > MacOS 可以跑 Flink,我自己刚试了下,复制你的命令就可以跑。
> > > > 建议再查一下你本地的环境,你本地的 Flink 是自己编译的吗?如果不行试一下 Flink 提供的 binary 包 [1]?
> > > >
> > > > [1] https://flink.apache.org/downloads.html
> > > >
> > > > Thanks,
> > > > Biao /'bɪ.aʊ/
> > > >
> > > >
> > > >
> > > > On Tue, 5 Nov 2019 at 12:30, jeff kit  wrote:
> > > >
> > > > > HI,大家好:
> > > > > 我在运行Flink官方的Quick
> > > > >
> Start就遇到了问題。为了避免自己问蠢问題,我先做了很多尝试,如换Flink的版本,从1.7到1.8及至1.9都试过,在我自己的Mac
> > OS
> > > > > X上这个问題是必然出现的,而换到其他操作系统例如Windows,则是正常的。
> > > > >
> > > > > 这也许不是一个常见的问題,更多是我本机的运行环境问題,但多天尝试下来仍然没有找到解决方法,才在这里求助一下。
> > > > >
> > > > > 操作步骤:
> > > > > 1. ./bin/start-cluster.sh  # 启动flink。
> > > > > 2. ./bin/flink run examples/batch/WordCount.jar   # 提交wordCount 包
> > > > >
> > > > > 随后就是抛了异常:
> > > > > Starting execution of program
> > > > > Executing WordCount example with default input data set.
> > > > > Use --input to specify file input.
> > > > > Printing result to stdout. Use --output to specify output path.
> > > > >
> > > > > 
> > > > >  The program finished with the following exception:
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: Could
> not
> > > > > retrieve the execution result. (JobID:
> > > 81bc8720dee57710788cc8e41079ba4d)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
> > > > > at
> > > > >
> > >
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> > > > > at
> > > > >
> > >
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> > > > > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> > > > > at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:88)
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> 

Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-11-05 Thread Till Rohrmann
Hi Regina,

I've taken another look at the problem I think we could improve the
situation by reordering the calls we do in
YarnResourceManager#onContainersAllocated. I've created a PR [1] for the
re-opened issue [2]. Would it be possible for you to verify the fix? What
you need to do is to check this PR out, then build Flink based on this PR
and then running the test. You can check out the PR via `git fetch
https://github.com/apache/flink.git pull/10089/head:FLINK-12342`. This
command will fetch the PR and make it available under FLINK-12342 which you
can check out and then run `mvn clean install -DskipTests` in order to
build Flink. If it is easier to check out the branch from my repository,
then you can find it here [3]. Thanks a lot for your help!

[1] https://github.com/apache/flink/pull/10089
[2] https://issues.apache.org/jira/browse/FLINK-12342
[3] https://github.com/tillrohrmann/flink/tree/FLINK-12342

Cheers,
Till

On Fri, Nov 1, 2019 at 9:33 AM Till Rohrmann  wrote:

> Hi Regina,
>
> at the moment the community works towards the 1.10 release with a lot of
> features trying to be completed. The intended feature freeze is end of
> November. Due to this it is quite hard to tell when exactly this problem
> will be properly fixed but we'll try our best.
>
> Cheers,
> Till
>
> On Thu, Oct 31, 2019 at 4:59 PM Chan, Regina  wrote:
>
>> Yeah I saw FLINK-13184 earlier and started watching it. I can see the
>> second optimization being helpful too in a large cluster. I’ll be watching
>> this as well. Do you have an estimate as to turn around time? Would be
>> helpful planning-wise.
>>
>>
>>
>>
>>
>> *From:* Yang Wang 
>> *Sent:* Thursday, October 31, 2019 4:08 AM
>> *To:* Chan, Regina [Engineering] 
>> *Cc:* Till Rohrmann ; user 
>> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
>> about the number of pending container requests has diverged
>>
>>
>>
>> I think till's analysis is right. I just want to share more information.
>>
>>
>>
>> After dive into the logs of Flink resource manager and Yarn resource
>> manager, i found that the excess
>>
>> containers come from two sides.
>>
>>
>>
>> ** Yarn Container Allocation Mechanism **
>>
>> Receive containers more than allocated is inescapable. Imagine that we
>> want to allocate 120 containers
>>
>> from Yarn. The size of container request in the *heartbeat1* will be
>> 120. When Yarn RM received the
>>
>> request and can not allocate any container because of not enough
>> resource. So the allocated containers in
>>
>> response of *heartbeat1 *will be 0. The Flink resource manager does not
>> get any containers and will
>>
>> set the size of container request in *heartbeat2 *to 120. However, Yarn
>> resource manager has allocated
>>
>> 120 containers between *heartbeat1* to *heartbeat2*. When Yarn Resource
>> Manager receives *heartbeat2*, it will
>>
>> set the 120 containers to response of *heartbeat2*. And it start to
>> allocate for the new request of 120. Since
>>
>> Flink resource manager has received all containers, it will set the size
>> of container request in *heartbeat3* to 0.
>>
>> Yarn Resource Manager allocate 100 containers between *heartbeat2* to
>> *heartbeat3*, it will set the 100 containers
>>
>> to response of *heartbeat3*. So Flink Resource Manager gets the 100
>> excess containers.
>>
>>
>>
>> Note: Heartbeat means the heartbeat between Flink resource manager(Yarn
>> client) and Yarn resource manager.
>>
>>
>>
>>
>>
>> ** Flink resource manager allocates more than it really needs **
>>
>> Now in the onContainersAllocated of FlinkYarnResourceManager, we iterate
>> through each container.
>>
>> And each process will take more than 50ms. The most time cost at
>> uploading {uuid}-taskmanager-conf.yaml to hdfs
>>
>> and starting container. So if the allocated containers are more than 10,
>> FlinkYarnResourceManager could not remove
>>
>> container request timely and will allocate more than it really needs.
>>
>>
>>
>>
>>
>> The first cause of Yarn, we could not do anything more from Flink.
>> However, for the second, we could reduce the time
>>
>> costof each allocated container so that FlinkYarnResource will allocate
>> as it really need.  We could have two optimizations
>>
>> here. The first is use NMClientAsync instead of NMClient to reduce the
>> start container time.[1] The
>>
>> second is *do not *upload {uuid}-taskmanager-conf.yaml, use java options
>> or environments instead. [2]
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 1.https://issues.apache.org/jira/browse/FLINK-13184
>> 
>>
>> 2. https://issues.apache.org/jira/browse/FLINK-14582
>> 

Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread bupt_ljy
This should be sent to user mailing list. Moving it here...


 Original Message 
Sender: bupt_ljy
Recipient: dev
Date: Tuesday, Nov 5, 2019 21:13
Subject: Re: RocksDB state on HDFS seems not being cleanned up


Hi Shuwen, The “shared” means that the state files are shared among multiple 
checkpoints, which happens when you enable incremental checkpointing[1]. 
Therefore, it’s reasonable that the size keeps growing if you set 
“state.checkpoint.num-retained” to be a big value. [1] 
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html 
Best, Jiayi Liao Original Message Sender: shuwen zhou 
Recipient: dev Date: Tuesday, Nov 5, 2019 17:59 Subject: 
RocksDB state on HDFS seems not being cleanned up Hi Community, I have a job 
running on Flink1.9.0 on YARN with rocksDB on HDFS with incremental checkpoint 
enabled. I have some MapState in code with following config: val ttlConfig = 
StateTtlConfig .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() 
.cleanupInBackground() .cleanupFullSnapshot() 
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) 
After running for around 2 days, I observed checkpoint folder is showing 44.4 M 
/flink-chk743e4568a70b626837b/chk-40 65.9 M 
/flink-chk743e4568a70b626837b/chk-41 91.7 M 
/flink-chk743e4568a70b626837b/chk-42 96.1 M 
/flink-chk743e4568a70b626837b/chk-43 48.1 M 
/flink-chk743e4568a70b626837b/chk-44 71.6 M 
/flink-chk743e4568a70b626837b/chk-45 50.9 M 
/flink-chk743e4568a70b626837b/chk-46 90.2 M 
/flink-chk743e4568a70b626837b/chk-37 49.3 M 
/flink-chk743e4568a70b626837b/chk-38 96.9 M 
/flink-chk743e4568a70b626837b/chk-39 797.9 G 
/flink-chk743e4568a70b626837b/shared The ./shared folder size seems continuing 
increasing and seems the folder is not being clean up. However while I disabled 
incremental cleanup, the expired full snapshot will be removed automatically. 
Is there any way to remove outdated state on HDFS to stop it from increasing? 
Thanks. -- Best Wishes, Shuwen Zhou

Re: 在 Trigger 里可以知道 Window 中数据的状况吗

2019-11-05 Thread Utopia
我是用自定义的 Trigger 的,但是在 Trigger 中获取不到 Window 中的数据信息。

zhisheng  于2019年11月5日周二 下午8:16写道:

> 可以自定义 trigger,将两者的条件结合起来才fire
>
> Utopia  于2019年11月5日周二 下午2:19写道:
>
> > 不好意思没有描述清楚,我们业务场景是需要使用 SessionWindow的,不知道能不能在 Trigger 中获取当前 Window 中元素的。
> >
> > Best  regards
> > Utopia
> > 2019年11月5日 +0800 14:16 Biao Liu ,写道:
> > > 你好,
> > >
> > > countWindow [1] 能满足你的需求吗?
> > >
> > > [1]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#countWindow-long-
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Tue, 5 Nov 2019 at 14:01, Utopia  wrote:
> > >
> > > > 大家好,
> > > >
> > > > 我想根据 Window 中数据的信息,比如数据的数量来决定是否 Fire,应该怎么实现呢?是否必须自己维护这样的状态。
> > > >
> > > > Best regards
> > > > Utopia
> > > >
> >
>


Re: 如何让Flink trigger只输出有变化的数据?

2019-11-05 Thread ChangjunZhang
我们遇到过类似的问题,最终sink前可将key的结果存入state中,通过对比state中的结果与最终结果是否一致,去决定是否需要collect结果。
觉得不是最好的解决办法,请教下如何通过Evictor剔除未更新的数据,不知是否方便详细说一下实现方式,谢谢!

> 2019年11月1日 下午8:41,Jun Zhang <825875...@qq.com> 写道:
> 
> 你可以指定一个Evictor,删除窗口中处理过的数据。
> 
> BestJun
> 
> 
> -- 原始邮件 --
> 发件人: Qi Kang  发送时间: 2019年11月1日 16:37
> 收件人: user-zh  主题: 回复:如何让Flink trigger只输出有变化的数据?
> 
> 
> 
> Hi,
> 
> 我们有一个按自然天聚合统计各站点销量和GMV数据的Flink任务,代码框架如下:
> 
> ```
> sourceStream
>  .map(message - JSON.parseObject(message, OrderDetail.class))
>  .keyby("siteId")
>  .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
>  .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>  .aggregate(new VolumeGmvAggregateFunc());
> ```
> 
> 为了能够让dashboard实时刷新数据,每秒会触发一次计算。但是站点有将近1000个,每秒都输出全量结果不太现实,请问有什么简便的方法能够仅输出一秒内发生过变化的站点数据呢?Thx.



Re: 在 Trigger 里可以知道 Window 中数据的状况吗

2019-11-05 Thread zhisheng
可以自定义 trigger,将两者的条件结合起来才fire

Utopia  于2019年11月5日周二 下午2:19写道:

> 不好意思没有描述清楚,我们业务场景是需要使用 SessionWindow的,不知道能不能在 Trigger 中获取当前 Window 中元素的。
>
> Best  regards
> Utopia
> 2019年11月5日 +0800 14:16 Biao Liu ,写道:
> > 你好,
> >
> > countWindow [1] 能满足你的需求吗?
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#countWindow-long-
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Tue, 5 Nov 2019 at 14:01, Utopia  wrote:
> >
> > > 大家好,
> > >
> > > 我想根据 Window 中数据的信息,比如数据的数量来决定是否 Fire,应该怎么实现呢?是否必须自己维护这样的状态。
> > >
> > > Best regards
> > > Utopia
> > >
>


Re: [metrics] metrics 中 Availability 和 Checkpointing 这两组没有显示

2019-11-05 Thread zhisheng
你是不是在同一台机器上开了 job manager 和 taskmanager?但是只填写了一个端口?

可以参考官网
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
,他这里写了端口 port 参数的情况

Biao Liu  于2019年11月5日周二 上午11:37写道:

> 你好,
>
> JM 的 metric 应该也会直接 report。
> 可以考虑缩小下问题范围,是 metrics 还是 reporter 的问题。
> 例如加个 slf4j reporter [1],看下 JM log 中有没有相应的 metrics,如果有那就是 reporter 的问题。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 22 Oct 2019 at 17:37, Blake  wrote:
>
> > 使用 PrometheusReporter  去 report metric 信息
> > 发现:9250 端口 没有显示 Availability 和 Checkpointing 这两部分的信息
> > 是需要单独配置吗?在文档里面没有看到相关说明。
> > 我注意到:这两个的 scope 都是 Job (only available on JobManager)
> > 是要在启动时,指定额外的参数吗?
> >
> >
> >
> >
> > 配置如下:
> > flink-conf.yml
> > metrics.reporters: prom
> >
> > metrics.reporter.prom.class:
> > org.apache.flink.metrics.prometheus.PrometheusReporter
> > metrics.reporter.prom.port: 9250
> >
> > metrics.system-resource: true
> >
> >
> >
> >
> >
> >
> >
> > docker-compose.yml 局部:
> > services:
> >   jobmanager:
> > # image: flink:1.9.0
> > build: ./job_manager
> > container_name: jobmanager_1.9.0
> > volumes:
> >   - ./prometheus/:/etc/prometheus/
> >   - prometheus_data:/prometheus
> > ports:
> >   - "8081:8081"
> >   - "9250:9250"
> > expose:
> >   - "6123"
> > networks:
> >   - back-tier
> >   # - host-tier
> > command: jobmanager
> > environment:
> >   - JOB_MANAGER_RPC_ADDRESS=jobmanager
> >   taskmanager:
> > # image: flink:1.9.0
> > build: ./task_manager
> > container_name: taskmanager_1.9.0
> > ports:
> >   # - "9001:9001"
> >   - "9251:9251"
> > expose:
> >   - "6121"
> >   - "6122"
> > networks:
> >   - back-tier
> >   # - host-tier
> > command: taskmanager
> > depends_on:
> >   - jobmanager
> > environment:
> >   - JOB_MANAGER_RPC_ADDRESS=jobmanager
> >
> >
> >
> >
> > Docker
> > FROM flink:1.9.0
> >
> > COPY flink-conf.yaml ./conf/
> >
> > RUN cp ./opt/flink-metrics-prometheus-1.9.0.jar ./lib/
>


Re: Re: 怎样把 state 定时写到外部存储

2019-11-05 Thread zhisheng
感觉wanglei更想去获取 state 的数据,只不过是需要将 state 数据额外的存储到外部的系统去,但是这样可能性能不太好。

那么可以换个思路来,直降使用 flink 的 state,你外部的直接获取 flink 的 state 呢?使用 flink-queryable-state
获取。

Biao Liu  于2019年11月5日周二 上午11:10写道:

> 你好,
>
> 对你的问题描述有一些疑问
>
> > 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住
> > 有没有什么方式可以定期读 state 写到外部存储?
>
> 这里是什么意思呢?更改 state 值和写外部系统存储应该是两个独立的事件。state 是 Flink 内部使用的,给外部系统使用的数据一般通过
> sink 写出去,和 state 没有直接关系。
>
> 从你的描述中,只看到貌似是写 Mysql (是通过 sink 吗?) 扛不住。批量写一下?比如在 sink 中处理一下
>
> 如果没理解对你的问题,你可以再详细描述一下
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Fri, 1 Nov 2019 at 11:21, misaki L  wrote:
>
> > 使用 window 聚合一下批量写呢?
> >
> > wangl...@geekplus.com.cn  于2019年11月1日周五
> > 上午10:17写道:
> >
> > > Hi Congxian,
> > >
> > > 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。
> > > 我们的 case 是写到 MySQL 中
> > >
> > >
> > >
> > > wangl...@geekplus.com.cn
> > >
> > > Sender: Congxian Qiu
> > > Send Time: 2019-11-01 10:10
> > > Receiver: user-zh
> > > Subject: Re: 怎样把 state 定时写到外部存储
> > > 好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢?
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道:
> > >
> > > > 是否可以注册一个定时器?
> > > >
> > > >
> > > > 你看看这个文章,是否对你有帮助
> > > >
> > > >
> > > > https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
> > > >  在2019年10月31日 10:16,wangl...@geekplus.com.cn<
> > > wangl...@geekplus.com.cn
> > > > 写道:
> > > >
> > > >
> > > > 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
> > > > 有没有什么方式可以定期读 state 写到外部存储?
> > > > 我现在用的是 Flink1.7.2 版本。
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > wangl...@geekplus.com.cn
> > >
> >
>


Re: How can I get the backpressure signals inside my function or operator?

2019-11-05 Thread Zhijiang
Hi Felipe,

That is an interesting idea to control the upstream's output based on 
downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush 
output while the reduce operator is idle/hungry. In contrast, the preAggregate 
would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get 
the back pressure signal from the operator level. AIK only the upper task level 
can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of 
`Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might 
rely on some external metric reporter to query it if possible.

Best,
Zhijiang


--
From:Felipe Gutierrez 
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user 
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> 
sink" job and the reducer is sending backpressure signals to the preAggregate, 
map and source operator. How do I get those signals inside my operator's 
implementation?
I guess inside the function is not possible. But if I have my own operator 
implemented (preAggregate) can I get those backpressure signals?

I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] 
on my preAggregate operator in order to decide when I stop the pre-aggregation 
and flush tuples or when I keep pre aggregating. It is something like the 
"credit based control on the network stack" [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
[2] https://www.youtube.com/watch?v=AbqatHF3tZI

Thanks!
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: PreAggregate operator with timeout trigger

2019-11-05 Thread Piotr Nowojski
Yes you are right. Good to hear that you have solved your issue :)

Piotrek

> On 5 Nov 2019, at 09:56, Felipe Gutierrez  
> wrote:
> 
> Thanks Piotr,
> 
> the thing is that I am on Stream data and not on keyed stream data. So, I 
> cannot use the TimerService concept here. I am triggering a local window. I 
> ended up using java.util.Timer [1] and it seems to suffice my requirements.
> 
> [1] https://docs.oracle.com/javase/7/docs/api/java/util/Timer.html 
> 
> 
> Thanks!
> 
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com 
> 
> 
> On Wed, Oct 30, 2019 at 2:59 PM Piotr Nowojski  > wrote:
> Hi,
> 
> If you want to register a processing/event time trigger in your custom 
> operator, you can take a look how other operators are doing it, by calling
> AbstractStreamOperator#getInternalTimerService [1]. You can look around the 
> Flink’s code base for usages of this method, there are at least couple of 
> them (like CepOperator or IntervalJoinOperator).
> 
> Hope that helps,
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#getInternalTimerService-java.lang.String-org.apache.flink.api.common.typeutils.TypeSerializer-org.apache.flink.streaming.api.operators.Triggerable-
>  
> 
> 
>> On 28 Oct 2019, at 10:09, Felipe Gutierrez > > wrote:
>> 
>> Hi all,
>> 
>> I have my own stream operator which trigger an aggregation based on the 
>> number of items received 
>> (OneInputStreamOperator#processElement(StreamRecord)). However, it is 
>> possible to not trigger my aggregation if my operator does not receive the 
>> max items that have been set. So, I need a timeout trigger.
>> 
>> I am confused if I need to extend Trigger on 
>> MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a 
>> parameter of the operator class MyPreAggregate-AbstractUdfStreamOperator> V, IN, OUT, W extends Window>. what is the best approach?
>> 
>> Thanks,
>> Felipe
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com 
>> 



Re: PreAggregate operator with timeout trigger

2019-11-05 Thread Felipe Gutierrez
Ah, yep. I do create a keyed stream which does not partition data. And I
pre-aggregate key-values inside my operator. But I cannot rely on the
number of keys to pre-aggregate because I can never receive some specific
number of keys. So, the master concept to pre-aggregate is the time. After
that, I can aggregate earlier if I reach a number of keys.

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Tue, Nov 5, 2019 at 10:29 AM Gyula Fóra  wrote:

> Hi!
> Sorry I should have given more context around what I was suggesting :)
> What I was suggesting is maybe you could make your non-keyed stream keyed
> by assigning random/deterministic keys with some logic.
>
> Gyula
>
>
> On Tue, Nov 5, 2019 at 10:13 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> @Gyula, I am afraid I haven't got your point.
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Tue, Nov 5, 2019 at 10:11 AM Gyula Fóra  wrote:
>>
>>> You might have to introduce some dummy keys for a more robust solution
>>> that integrates with the fault-tolerance mechanism.
>>>
>>> Gyula
>>>
>>> On Tue, Nov 5, 2019 at 9:57 AM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Thanks Piotr,

 the thing is that I am on Stream data and not on keyed stream data. So,
 I cannot use the TimerService concept here. I am triggering a local window.
 I ended up using java.util.Timer [1] and it seems to suffice my
 requirements.

 [1] https://docs.oracle.com/javase/7/docs/api/java/util/Timer.html

 Thanks!

 *--*
 *-- Felipe Gutierrez*

 *-- skype: felipe.o.gutierrez*
 *--* *https://felipeogutierrez.blogspot.com
 *


 On Wed, Oct 30, 2019 at 2:59 PM Piotr Nowojski 
 wrote:

> Hi,
>
> If you want to register a processing/event time trigger in your custom
> operator, you can take a look how other operators are doing it, by calling
> AbstractStreamOperator#getInternalTimerService [1]. You can look
> around the Flink’s code base for usages of this method, there are at least
> couple of them (like CepOperator or IntervalJoinOperator).
>
> Hope that helps,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#getInternalTimerService-java.lang.String-org.apache.flink.api.common.typeutils.TypeSerializer-org.apache.flink.streaming.api.operators.Triggerable-
>
> On 28 Oct 2019, at 10:09, Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
> Hi all,
>
> I have my own stream operator which trigger an aggregation based on
> the number of items received
> (OneInputStreamOperator#processElement(StreamRecord)). However, it is
> possible to not trigger my aggregation if my operator does not receive the
> max items that have been set. So, I need a timeout trigger.
>
> I am confused if I need to extend Trigger on
> MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a
> parameter of the operator class 
> MyPreAggregate-AbstractUdfStreamOperator V, IN, OUT, W extends Window>. what is the best approach?
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
>


Re: PreAggregate operator with timeout trigger

2019-11-05 Thread Felipe Gutierrez
@Gyula, I am afraid I haven't got your point.
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Tue, Nov 5, 2019 at 10:11 AM Gyula Fóra  wrote:

> You might have to introduce some dummy keys for a more robust solution
> that integrates with the fault-tolerance mechanism.
>
> Gyula
>
> On Tue, Nov 5, 2019 at 9:57 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Thanks Piotr,
>>
>> the thing is that I am on Stream data and not on keyed stream data. So, I
>> cannot use the TimerService concept here. I am triggering a local window. I
>> ended up using java.util.Timer [1] and it seems to suffice my requirements.
>>
>> [1] https://docs.oracle.com/javase/7/docs/api/java/util/Timer.html
>>
>> Thanks!
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Oct 30, 2019 at 2:59 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> If you want to register a processing/event time trigger in your custom
>>> operator, you can take a look how other operators are doing it, by calling
>>> AbstractStreamOperator#getInternalTimerService [1]. You can look around
>>> the Flink’s code base for usages of this method, there are at least couple
>>> of them (like CepOperator or IntervalJoinOperator).
>>>
>>> Hope that helps,
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#getInternalTimerService-java.lang.String-org.apache.flink.api.common.typeutils.TypeSerializer-org.apache.flink.streaming.api.operators.Triggerable-
>>>
>>> On 28 Oct 2019, at 10:09, Felipe Gutierrez 
>>> wrote:
>>>
>>> Hi all,
>>>
>>> I have my own stream operator which trigger an aggregation based on the
>>> number of items received
>>> (OneInputStreamOperator#processElement(StreamRecord)). However, it is
>>> possible to not trigger my aggregation if my operator does not receive the
>>> max items that have been set. So, I need a timeout trigger.
>>>
>>> I am confused if I need to extend Trigger on
>>> MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a
>>> parameter of the operator class MyPreAggregate-AbstractUdfStreamOperator>> V, IN, OUT, W extends Window>. what is the best approach?
>>>
>>> Thanks,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> *
>>>
>>>
>>>


Re: PreAggregate operator with timeout trigger

2019-11-05 Thread Gyula Fóra
You might have to introduce some dummy keys for a more robust solution that
integrates with the fault-tolerance mechanism.

Gyula

On Tue, Nov 5, 2019 at 9:57 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Thanks Piotr,
>
> the thing is that I am on Stream data and not on keyed stream data. So, I
> cannot use the TimerService concept here. I am triggering a local window. I
> ended up using java.util.Timer [1] and it seems to suffice my requirements.
>
> [1] https://docs.oracle.com/javase/7/docs/api/java/util/Timer.html
>
> Thanks!
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Oct 30, 2019 at 2:59 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> If you want to register a processing/event time trigger in your custom
>> operator, you can take a look how other operators are doing it, by calling
>> AbstractStreamOperator#getInternalTimerService [1]. You can look around
>> the Flink’s code base for usages of this method, there are at least couple
>> of them (like CepOperator or IntervalJoinOperator).
>>
>> Hope that helps,
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#getInternalTimerService-java.lang.String-org.apache.flink.api.common.typeutils.TypeSerializer-org.apache.flink.streaming.api.operators.Triggerable-
>>
>> On 28 Oct 2019, at 10:09, Felipe Gutierrez 
>> wrote:
>>
>> Hi all,
>>
>> I have my own stream operator which trigger an aggregation based on the
>> number of items received
>> (OneInputStreamOperator#processElement(StreamRecord)). However, it is
>> possible to not trigger my aggregation if my operator does not receive the
>> max items that have been set. So, I need a timeout trigger.
>>
>> I am confused if I need to extend Trigger on
>> MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a
>> parameter of the operator class MyPreAggregate-AbstractUdfStreamOperator> V, IN, OUT, W extends Window>. what is the best approach?
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>>


How can I get the backpressure signals inside my function or operator?

2019-11-05 Thread Felipe Gutierrez
Hi all,

let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce
-> sink" job and the reducer is sending backpressure signals to the
preAggregate, map and source operator. How do I get those signals inside my
operator's implementation?
I guess inside the function is not possible. But if I have my own operator
implemented (preAggregate) can I get those backpressure signals?

I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength"
[1] on my preAggregate operator in order to decide when I stop the
pre-aggregation and flush tuples or when I keep pre aggregating. It is
something like the "credit based control on the network stack" [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
[2] https://www.youtube.com/watch?v=AbqatHF3tZI

Thanks!
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: PreAggregate operator with timeout trigger

2019-11-05 Thread Felipe Gutierrez
Thanks Piotr,

the thing is that I am on Stream data and not on keyed stream data. So, I
cannot use the TimerService concept here. I am triggering a local window. I
ended up using java.util.Timer [1] and it seems to suffice my requirements.

[1] https://docs.oracle.com/javase/7/docs/api/java/util/Timer.html

Thanks!

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Wed, Oct 30, 2019 at 2:59 PM Piotr Nowojski  wrote:

> Hi,
>
> If you want to register a processing/event time trigger in your custom
> operator, you can take a look how other operators are doing it, by calling
> AbstractStreamOperator#getInternalTimerService [1]. You can look around
> the Flink’s code base for usages of this method, there are at least couple
> of them (like CepOperator or IntervalJoinOperator).
>
> Hope that helps,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#getInternalTimerService-java.lang.String-org.apache.flink.api.common.typeutils.TypeSerializer-org.apache.flink.streaming.api.operators.Triggerable-
>
> On 28 Oct 2019, at 10:09, Felipe Gutierrez 
> wrote:
>
> Hi all,
>
> I have my own stream operator which trigger an aggregation based on the
> number of items received
> (OneInputStreamOperator#processElement(StreamRecord)). However, it is
> possible to not trigger my aggregation if my operator does not receive the
> max items that have been set. So, I need a timeout trigger.
>
> I am confused if I need to extend Trigger on
> MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a
> parameter of the operator class MyPreAggregate-AbstractUdfStreamOperator V, IN, OUT, W extends Window>. what is the best approach?
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
>


回复: flink作业提交到集群执行异常

2019-11-05 Thread Zhong venb
Pom文件已上传,不过您说的问题我有检查过我打的jar包里是有把kafka的相关包打进去的,也同时把对应的包放flink的lib下了。

-邮件原件-
发件人: 赵 恒泰  
发送时间: 2019年11月5日 16:35
收件人: user-zh@flink.apache.org
主题: 回复: flink作业提交到集群执行异常

你好,请问你这个flink作业的pom文件能发一下吗?我猜测你是直接参考官方quickstart修改的.如果是的话,需要激活额外的profile: 
add-dependencies-for-IDEA,并把flink-connector-kafka-0.?_2.11依赖的provided标签删掉,或参考profile添加flink-connector-kafka-0.?_2.11的compile.这样才会把依赖打包进jar包中.

-邮件原件-
发件人: Zhong venb  
发送时间: 2019年11月5日 15:04
收件人: user-zh@flink.apache.org
主题: flink作业提交到集群执行异常

Hi,
现在遇到个问题:Flink消费kafka作业在IDEA上编译执行正常,但是打包后发布到集群上运行报错,已将对应的jar包放到flink的lib路径下了,提交作业无报错!
 请大神帮忙分析一下原因,谢谢!!!

环境如下:
Flink:1.7.2
Kafka:1.1.0
Scala:2.11.8

报错信息如下:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-0d69900e-5299-4be9-b3bc-060d06559034/job_e8fccb398c2d9de108051beb06ec64cc/blob_p-d1e0b6ace7b204eb42f56ce87b96bff39cc58289-d0b8e666fc70af746ebbd73ff8b38354'
 (valid JAR) Class not resolvable through given classloader.
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
 at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
 at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
 ... 4 more




pom.xml
Description: pom.xml


回复: flink作业提交到集群执行异常

2019-11-05 Thread 赵 恒泰
你好,请问你这个flink作业的pom文件能发一下吗?我猜测你是直接参考官方quickstart修改的.如果是的话,需要激活额外的profile: 
add-dependencies-for-IDEA,并把flink-connector-kafka-0.?_2.11依赖的provided标签删掉,或参考profile添加flink-connector-kafka-0.?_2.11的compile.这样才会把依赖打包进jar包中.

-邮件原件-
发件人: Zhong venb  
发送时间: 2019年11月5日 15:04
收件人: user-zh@flink.apache.org
主题: flink作业提交到集群执行异常

Hi,
现在遇到个问题:Flink消费kafka作业在IDEA上编译执行正常,但是打包后发布到集群上运行报错,已将对应的jar包放到flink的lib路径下了,提交作业无报错!
 请大神帮忙分析一下原因,谢谢!!!

环境如下:
Flink:1.7.2
Kafka:1.1.0
Scala:2.11.8

报错信息如下:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-0d69900e-5299-4be9-b3bc-060d06559034/job_e8fccb398c2d9de108051beb06ec64cc/blob_p-d1e0b6ace7b204eb42f56ce87b96bff39cc58289-d0b8e666fc70af746ebbd73ff8b38354'
 (valid JAR) Class not resolvable through given classloader.
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
 at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
 at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
 ... 4 more