回复:flink checkpoint配置hdfs问题

2019-12-24 Thread 唐军亮
不要写地址 写nameservice

--原始邮件--
发件人:"LJY "

flink checkpoint配置hdfs问题

2019-12-24 Thread LJY
hi,各位好:
hdfs目前是配置了高可用, namenode分别为 nn1(active),nn2(standby)
flink-conf.yaml中的配置如下:
state.checkpoints.dir: hdfs://nn1的ip:8020/flink/flink-checkpoints/
然后发布一个job,等checkpoint生成后,把 nn1 namenode 杀掉。
nn2会自动切换成active,nn1无法访问。
job就会不停的restart。
   日志显示 failed on connection exception: java.net.ConnectException: Connection 
refused; 无法连接 nn1。
   请问checkpoint关于hdfs的高可用是如何配置的,state.checkpoints.dir 应该怎么写。
   



签名由 网易邮箱大师 定制

实现一个两阶段提交的ETL,数据从kafka到mysql,遇到的问题

2019-12-24 Thread 卢伟楠

项目简述:从kafka取数据,每10秒一批,sink到mysql中的ETL


环境相关信息
flink运行模式:local
mysql的global variables中wait_timeout=28800
mysql客户端mysql-connector-java版本5.1.42



报错
org.apache.flink.streaming.runtime.tasks.TimerException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[na:1.8.0_191]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[na:1.8.0_191]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[na:1.8.0_191]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[na:1.8.0_191]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191]
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na]
at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na]
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
 ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0]
... 7 common frames omitted
Caused by: java.sql.SQLException: Could not retrieve transaction read-only 
status from server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
at 
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230)
 ~[mysql-connector-java-5.1.42.jar:5.1.42]
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) 
~[mysql-connector-java-5.1.42.jar:5.1.42]
   

Re: using thin jar to replace fat jar on yarn cluster mode

2019-12-24 Thread Jingsong Li
Hi zjfplayer,

First, your user fat jar should not contains any flink dependents, them
should be "provided".
If your goal is to reduce the deployment time of task, and you are
currently using session mode, you can consider putting jars directly under
lib.

Best,
Jingsong Lee

On Mon, Dec 23, 2019 at 3:24 PM Rui Li  wrote:

> Hi,
>
> I think you can try specifying dependent jars with the -C option[1] when
> you submit the job, and see if that meets your needs.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#usage
>
> On Mon, Dec 23, 2019 at 10:09 AM zjfpla...@hotmail.com <
> zjfpla...@hotmail.com> wrote:
>
>> Hi,
>> Does flink on yarn support using thin jar to replace fat jar?
>> I don't want the jar of each flink task to have hundreds of MB. I
>> want to put all the dependent packages in a single directory,and then
>> the size of each flink task jar will be tens of KB.
>>
>> --
>> zjfpla...@hotmail.com
>>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee


Re: using thin jar to replace fat jar on yarn cluster mode

2019-12-24 Thread Jingsong Li
Hi zjfplayer,

First, your user fat jar should not contains any flink dependents, them
should be "provided".
If your goal is to reduce the deployment time of task, and you are
currently using session mode, you can consider putting jars directly under
lib.

Best,
Jingsong Lee

On Mon, Dec 23, 2019 at 3:24 PM Rui Li  wrote:

> Hi,
>
> I think you can try specifying dependent jars with the -C option[1] when
> you submit the job, and see if that meets your needs.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#usage
>
> On Mon, Dec 23, 2019 at 10:09 AM zjfpla...@hotmail.com <
> zjfpla...@hotmail.com> wrote:
>
>> Hi,
>> Does flink on yarn support using thin jar to replace fat jar?
>> I don't want the jar of each flink task to have hundreds of MB. I
>> want to put all the dependent packages in a single directory,and then
>> the size of each flink task jar will be tens of KB.
>>
>> --
>> zjfpla...@hotmail.com
>>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee


Re: Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode

2019-12-24 Thread vino yang
Hi Mans,

IMO, the mechanism of metrics reporter does not depend on any deployment
mode.

>> is there any Prometheus configuration or service discovery option
available that will dynamically pick up the metrics from the Filnk job and
task managers running in cluster ?

Can you share more information about your scene?

>> I believe for a batch job I can configure flink config to use Prometheus
gateway configuration but I think this is not recommended for a streaming
job.

What does this mean? Why the Prometheus gateway configuration for Flink
batch job is not recommended for a streaming job?

Best,
Vino

M Singh  于2019年12月24日周二 下午4:02写道:

> Hi:
>
> I wanted to find out what's the best way of collecting Flink metrics using
> Prometheus in a streaming application on EMR/Hadoop.
>
> Since the Flink streaming jobs could be running on any node - is there any
> Prometheus configuration or service discovery option available that will
> dynamically pick up the metrics from the Filnk job and task managers
> running in cluster ?
>
> I believe for a batch job I can configure flink config to use Prometheus
> gateway configuration but I think this is not recommended for a streaming
> job.
>
> Please let me know if you have any advice.
>
> Thanks
>
> Mans
>


Re: 关于Window ProcessFunction数据丢失问题

2019-12-24 Thread Dian Fu
window算子会丢弃迟到数据,可以看看是不是这个原因。如果是的话,可以调整一下watermark生成策略,适当增加点延迟,可以看一下:BoundedOutOfOrdernessTimestampExtractor

> 在 2019年12月25日,上午10:39,1530130567 <1530130...@qq.com> 写道:
> 
> 大佬们好:
> 
> 最近使用window+processfunction处理数据,并加了watermark延迟处理。
> 
> 发现当数据input到达一定峰值后就会出现数据丢失的现象,请问是processfunction处理能力不够导致的吗?
> 具体数据处理情况可看下图:(如图片无法显示,请打开此链接查看)
> https://www.imageoss.com/image/sTn2U
> 
> 看了一下metric,确实是recordsInrecordsOut
> 代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
> 代码如下:
> .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
> .process(new ProcessWindowFunction @Override
> public void process(Integer integer, Context context, Iterable elements, Collector for (Row element : elements) {
> out.collect(element);
> }
> }
> })



Re: Flink task node shut it self off.

2019-12-24 Thread Zhijiang
If you use rocksDB state backend, it might consume extra native memory. 
Some resource framework cluster like yarn would kill the container if the 
memory usage exceeds some threshold. You can also double check whether it 
exists in your case.


--
From:John Smith 
Send Time:2019 Dec. 25 (Wed.) 03:40
To:Zhijiang 
Cc:user 
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any state 
Checkpoints are disk based...
On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,  wrote:

Hi John,

Thanks for the positive comments of Flink usage. No matter at least-once or 
exactly-once you used for checkpoint, it would never lose one message during 
failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the 
longer internal checkpoint would mean replaying more source data after failure 
recovery.
In my experience the 5 seconds interval for checkpoint is too frequently in my 
experience, and you might increase it to 1 minute or so. You can also monitor 
how long will the checkpoint finish in your application, then you can adjust 
the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure whether it 
is relevant to your short checkpoint interval. Do you config to use heap state 
backend?  The hs_err file really indicated that you job had encountered the 
memory issue, then it is better to somehow increase your task manager memory. 
But if you can analyze the dump hs_err file via some profiler tool for checking 
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang 

--
From:John Smith 
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user 
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and 
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We 
have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, 
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor 
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory 
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory 
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 
1minute of 15. And we also got an hs_err file which sais we should increase the 
memory.

I'm attaching the logs here: 
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?








Re: Flink1.9批任务yn和ys对任务的影响

2019-12-24 Thread Xintong Song
Hi faaron,
Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。
根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 TM
中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。
Flink 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
的managed memory无法满足算子的资源需求了。

Thank you~

Xintong Song



On Wed, Dec 25, 2019 at 11:09 AM faaron zheng  wrote:

> 跑tpcds的query1: flink run -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g
> -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm
> 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to
> ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱:
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制


Flink1.9批任务yn和ys对任务的影响

2019-12-24 Thread faaron zheng
跑tpcds的query1: flink run -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g -ytm 
30g 任务可以正常执行 flink run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm 60g -ytm 
30g 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to 
ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制

Re: 关于flink窗口是否正确关闭的问题

2019-12-24 Thread Jary Zhen
使用基于EventTime 的 watermark处理数据通常会碰到两这么两种情况:
1.  数据因为乱序,迟到严重,会被丢弃,这个可以查看Side Out API [1]
2.
 
数据产生的事件时间比当前系统时间大,我称之为“超自然数据”,比如当前系统时间是10:37:55,但数据产生的事件时间可能是10:38:55,那么一旦有这类数据到达,将会使窗口提前触发计算,导致正常数据被当做迟到数据,因而被丢弃,这个处理方式是在assignWaterMark
之前过滤掉。
3. 建议: 如果是简单的ETL,尽量不要用EventTime 来处理数据

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html

On Wed, 25 Dec 2019 at 09:57, 1530130567 <1530130...@qq.com> wrote:

> 大佬好:
> 我昨天看了一下metric,确实是recordsInrecordsOut
> 代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
> 代码如下:
>
> .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
> .process(new ProcessWindowFunction @Override
> public void process(Integer integer, Context context, Iterable elements, Collector for (Row element : elements) {
> out.collect(element);
> }
> }
> })
>
>
>
>
> --原始邮件--
> 发件人:"jingjing bai" 发送时间:2019年12月24日(星期二) 晚上9:18
> 收件人:"user-zh"
> 主题:Re: 关于flink窗口是否正确关闭的问题
>
>
>
> 窗口不会提前关闭,请查看下metircs是否有数据丢弃,
>
>
> 1530130567 <1530130...@qq.com 于2019年12月24日周二 下午8:46写道:
>
>  各位大佬好:
>  nbsp; 最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
>  nbsp;
> 我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
>  nbsp; 举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
>  nbsp;
> 
> 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
>  nbsp; ps:我加大了并行度还是不行


????Window ProcessFunction????????????

2019-12-24 Thread 1530130567
??


window+processfunctionwatermark??


??inputprocessfunction??
??
https://www.imageoss.com/image/sTn2U

metricrecordsIngt;recordsOut
??window??processfunction??filter??
??
.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
.process(new ProcessWindowFunction

Re: The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed

2019-12-24 Thread Xintong Song
这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。

Thank you~

Xintong Song



On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote:

> 偶尔发现,分配好的slot突然就被remove了,导致作业重启,看不出是什么原因导致?CPU和FULL GC都没有,异常信息如下:
>
> org.apache.flink.util.FlinkException: The assigned slot
> bae00218c818157649eb9e3c533b86af_11 was removed.
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at
> akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at
> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at
> akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


?????? ????flink??????????????????????

2019-12-24 Thread 1530130567

??metricrecordsInrecordsOut
??window??processfunction??filter??
??
.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
.process(new ProcessWindowFunction

Re: Flink task node shut it self off.

2019-12-24 Thread John Smith
The shutdown happened after the massive IO wait. I don't use any state
Checkpoints are disk based...

On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
wrote:

> Hi John,
>
> Thanks for the positive comments of Flink usage. No matter at least-once
> or exactly-once you used for checkpoint, it would never lose one message
> during failure recovery.
>
> Unfortunatelly I can not visit the logs you posted. Generally speaking the
> longer internal checkpoint would mean replaying more source data after
> failure recovery.
> In my experience the 5 seconds interval for checkpoint is too frequently
> in my experience, and you might increase it to 1 minute or so. You can also
> monitor how long will the checkpoint finish in your application, then you
> can adjust the interval accordingly.
>
> Concerning of the node shutdown you mentioned, I am not quite sure whether
> it is relevant to your short checkpoint interval. Do you config to use heap
> state backend?  The hs_err file really indicated that you job had
> encountered the memory issue, then it is better to somehow increase your
> task manager memory. But if you can analyze the dump hs_err file via some
> profiler tool for checking the memory usage, it might be more helpful to
> find the root cause.
>
> Best,
> Zhijiang
>
> --
> From:John Smith 
> Send Time:2019 Dec. 21 (Sat.) 05:26
> To:user 
> Subject:Flink task node shut it self off.
>
> Hi, using Flink 1.8.0
>
> 1st off I must say Flink resiliency is very impressive, we lost a node and
> never lost one message by using checkpoints and Kafka. Thanks!
>
> The cluster is a self hosted cluster and we use our own zookeeper cluster.
> We have...
> 3 zookeepers: 4 cpu, 8GB (each)
> 3 job nodes: 4 cpu, 8GB (each)
> 3 task nodes: 4 cpu, 8GB (each)
> The nodes also share GlusterFS for storing savepoints and checkpoints,
> GlusterFS is running on the same machines.
>
> Yesterday a node shut itself off we the following log messages...
> - Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73
> :34697/user/taskmanager_0.
> - Stop job leader service.
> - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> - Shutting down TaskExecutorLocalStateStoresManager.
> - Shutting down BLOB cache
> - Shutting down BLOB cache
> - removed file cache directory
> /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
> - I/O manager removed spill file directory
> /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
> - Shutting down the network environment and its components.
>
> Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU
> load 1minute of 15. And we also got an hs_err file which sais we should
> increase the memory.
>
> I'm attaching the logs here:
> https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0
>
> I wonder if my 5 second checkpointing is too much for gluster.
>
> Any thoughts?
>
>
>
>
>
>


The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed

2019-12-24 Thread hiliuxg
??slotremove???CPU??FULL
 GC??

org.apache.flink.util.FlinkException: The assigned slot 
bae00218c818157649eb9e3c533b86af_11 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at 
akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

2019-12-24 Thread jingjing bai
Typically , *NoClassDefFoundError*  is caused   by jar conflicts ,  it
means, there are two same class and not same version jar in your
classpath,
I suggest you can check which jar  is this class from ,and then to confirm
is there more too than one jar in u classpath.  if is and remove unuse jar .

I hope this can help you.

best


Re: CEP匹配乱序数据的问题

2019-12-24 Thread jingjing bai
CEP的 sql中order by ,  不会因为乱序导致不匹配。
在api中没用过,可以看看是否有对应的api

qishang zhong  于2019年12月23日周一 下午9:37写道:

> HI,大家好。
>
> 咨询一个问题,flink-training-exercises练习的工程里面
> com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution
>
> Pattern completedRides =
> Pattern.begin("start")
> .where(new SimpleCondition() {
> @Override
> public boolean filter(TaxiRide ride) throws Exception {
> return ride.isStart;
> }
> })
> .next("end")
> .where(new SimpleCondition() {
> @Override
> public boolean filter(TaxiRide ride) throws Exception {
> return !ride.isStart;
> }
> });
>
> 现在有一个类似的监控场景,也是需要超时后输出没有匹配到的数据,但是流的数据有可能产生乱序。
> 是不是就不能匹配例子中的Pattern?
> 如果我想乱序的数据也要匹配上,不作为超时输出有什么对应的解决方案吗?
>


????flink??????????????????????

2019-12-24 Thread 1530130567

 ??flink stream api??ETL
 
1??tumble??watermark??10s??
 topic 5000/s,??topic4000/s
 
processfunction
 ps

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

2019-12-24 Thread Congxian Qiu
Hi

I encountered same problem before and found that the `NoClassDefFoundError`
is not the root cause, maybe you can check the jm/tm log to see if this is
your case.

Best,
Congxian


syed  于2019年12月24日周二 下午7:30写道:

> Hi; I am trying to run simple worcount application using the kafka
> consumer so that the data sources remain available and I can trigger
> checkpoints. When deploying the application over flink 1.7.2 cluster, I am
> facing the *java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer* error. The
> complete trace of error is as follows; java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.setDeserializer(FlinkKafkaConsumer09.java:299)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:218)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:156)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:140)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:107)at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:72)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer at
> java.net.URLClassLoader.findClass(URLClassLoader.java:382) at
> java.lang.ClassLoader.loadClass(ClassLoader.java:418) at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at
> java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 20 more I am
> using kafka_2.11-2.3.0, and flink 1.7.2. The application code comiples and
> jar is created, but the error occurs when running the jar over flink
> cluster. The code snippet of word count for kafka sources is as follows;
> DataStream text; if (params.has("topic") &&
> params.has("bootstrap.servers") && params.has("zookeeper.connect") &&
> params.has("group.id")){ text = env.addSource(new FlinkKafkaConsumer08(
> params.get("topic"), new SimpleStringSchema(), params.getProperties()
> ).setStartFromEarliest()); } else { System.out.println("Executing WordCount
> example with default input data set."); System.out.println("Use --topic 
> --bootstrap.servers
> " +"--zookeeper.connect --group.id specify the topic info."); text =
> env.fromElements(WordCountData.WORDS); Please guide me how to fix this
> error. I also tried to use FlinkKafkaConsumer09, but not successful. Thank
> you. Looking forward; Syed
> --
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

2019-12-24 Thread syed
Hi;I am trying to run simple worcount application using the kafka consumer so
that the data sources remain available and I can trigger checkpoints. When
deploying the application over flink 1.7.2 cluster, I am facing the
*java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer* error.The
complete trace of error is as follows;java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.setDeserializer(FlinkKafkaConsumer09.java:299)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:156)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:140)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.(FlinkKafkaConsumer09.java:107)at
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at
java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)Caused
by: java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArrayDeserializerat
java.net.URLClassLoader.findClass(URLClassLoader.java:382)at
java.lang.ClassLoader.loadClass(ClassLoader.java:418)at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at
java.lang.ClassLoader.loadClass(ClassLoader.java:351)   ... 20 moreI am using
kafka_2.11-2.3.0, and flink 1.7.2. The application code comiples and jar is
created, but the error occurs when running the jar over flink cluster. The
code snippet of word count for kafka sources is as follows;DataStream
text;if (params.has("topic") && params.has("bootstrap.servers")
&("zookeeper.connect") && params.has("group.id")){text =
env.addSource(new FlinkKafkaConsumer08( params.get("topic"),new
SimpleStringSchema(),params.getProperties()).setStartFromEarliest());} else
{System.out.println("Executing WordCount example with default input data
set.");System.out.println("Use --topic  --bootstrap.servers  "
+"--zookeeper.connect  --group.id  specify the topic info.");text =
env.fromElements(WordCountData.WORDS);Please guide me how to fix this error.
I also tried to use FlinkKafkaConsumer09, but not successful.Thank
you.Looking forward;Syed



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Get consumer group offset

2019-12-24 Thread qq
Hi all,

   I use Kafka 0.10.0, Flink 1.9.0, why I can’t get flink consumer10 group 
which I had configured.  And I use KafkaConsumer not with Flink to consumer the 
same topic, I can get the consumer group metadata. Thanks.

Kafka/bin/kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server 
t4:9092,t5:9092,t6:9092 --new-consumer --list|grep consumer_group_1



  AlexFu

Re: Query on state of ValueState getting lost

2019-12-24 Thread Congxian Qiu
Hi

Could you please check whether the key and namespace are the same? when
getting a result for the value state, Flink will use current key and
current namespace to retrieve the result.

Best,
Congxian


Parth Sarathy  于2019年12月22日周日 下午9:54写道:

> Hi,
>
> I am using flink 1.8.2, event time stream for sliding window transformation
> with custom trigger. Window size is 15 seconds, slide size is also 15
> seconds with 30 seconds allowed lateness. In the trigger implementation
> couple of class level ValueStateDescriptors are used, the corresponding
> ValueState variables are updated/initialized in the onElement method and
> these are used in onEventTime method. On sending two records at 90 seconds
> interval, with current time updated in timestamp field of the messages,
> onEventTime method was called when the second record arrived, for the
> window
> with the first record, and in this method the values in the corresponding
> ValueState variables were found to be null. When the two records were sent
> with time difference of 75 seconds the values in the ValueStates were
> present.
>
> What could be the reason for the values in ValueStates becoming null when
> time difference between the messages was around 90 seconds or higher? I
> checked that the TTL in the default StateTtlConfig is very high, so I am
> not
> sure why the states expired.
>
> Thanks,
> Parth Sarathy
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode

2019-12-24 Thread M Singh
Hi:
I wanted to find out what's the best way of collecting Flink metrics using 
Prometheus in a streaming application on EMR/Hadoop.
Since the Flink streaming jobs could be running on any node - is there any 
Prometheus configuration or service discovery option available that will 
dynamically pick up the metrics from the Filnk job and task managers running in 
cluster ?  
I believe for a batch job I can configure flink config to use Prometheus 
gateway configuration but I think this is not recommended for a streaming job.
Please let me know if you have any advice.
Thanks
Mans