Hi guys,
I have another question related to the KPL problem. I wonder what the
consequences of overwhelming KPL internal queue (kinesis) can be.
From my observation in experimenting with 1.4.2 (which does not have
backpressure support yet in the open pr stated below), when the flink cluster
is processing too fast and the throughput on the sink kinesis stream is
limited, i.e, the throughput exceeding exception starts to be thrown, we quite
often get the following exception (pasted in the end) very soon and all the
subtasks switching status to cancelling and restarted.
From the exception trace, I can see that yarn got shutdown and all task
managers are terminated. I suspect it is because of the memory issue. Whenever
the throughput exceeding exception is thrown, it implicitly means that the
internal unbounded queue in KPL may grow rapidly. we set the recordTtl = 60s
and we can still see the record expiration exception along with exceeded
throughput exception. Which leads me to wonder that if the internal unbounded
queue grows too large and exhaust all the memory in the node and eventually
crashing the yarn and the job manager.
Well, This is just my hypothesis. I wonder if someone has already encountered
or investigated similar issues and could shed some light on it.
java.lang.Exception: TaskManager was lost/killed:
container_1529095945616_0009_01_000004 @ ip-172-31-64-249.ec2.internal
(dataPort=44591)
at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
From: "Liu, Gavin (CAI - Atlanta)" <[email protected]>
Date: Wednesday, June 20, 2018 at 12:11 PM
To: "Tzu-Li (Gordon) Tai" <[email protected]>, "[email protected]"
<[email protected]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2
Thanks, Gordon. You are quick and It is very helpful to me.
I tried some other alternatives to resolve this, finally thought about
rewriting the FlinkKinesisProducer class for our need. Glad that I asked before
I started.
Really appreciate the quick response.
From: "Tzu-Li (Gordon) Tai" <[email protected]>
Date: Wednesday, June 20, 2018 at 12:05 PM
To: "Liu, Gavin (CAI - Atlanta)" <[email protected]>,
"[email protected]" <[email protected]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2
Hi Gavin,
The problem is that the Kinesis producer currently does not propagate
backpressure properly.
Records are added to the internally used KPL client’s queue, without any queue
size limit.
This is considered a bug, and already has a pull request for it [1], which we
should probably push towards being merged soon.
What the pull request essentially does, is adding an upper bound to the number
pending records in the KPL producer queue.
Once the upper bound is hit, input to the Kinesis producer sink is blocked, and
therefore propagating backpressure further upstream.
Cheers,
Gordon
[1] https://github.com/apache/flink/pull/6021
On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta)
([email protected]<mailto:[email protected]>) wrote:
Hi guys,
I am new to flink framework. And we are building an application that takes
kinesis stream for both flink source and sink.
The flink version we are using is 1.4.2, which is also the version for the
flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly
with KPL version 0.12.6 due to the existing problems with default 0.12.5.
I get a rough idea how the backpressure works with flink through reading
http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%[email protected]%3E
From my experiment with flink and flink-connector-kinesis, the back pressure
only happens within flink processing operations, i.e., not in the flink
producer to kinesis.
More specifically, when the throughput from KPL exceeds the kinesis throughput
limitations, flink does not slow down at all, i.e., it does not add pressure on
the processing chain up to the flink consumer.
Correct me if I misunderstood this. It looks like the flink producer (in the
flink-connector-kinesis) is a standalone component, once a record is collected
and sent to the producer, flink core finishes all the processing and does not
care the fate of the record any more, it is the responsibility of the connector
to continue the job.
I am expecting back pressure to happen from the source kinesis stream to the
sink kinesis stream, whenever the sink kinesis stream could not handle the
volume, it adds back pressure. Could someone illustrate a bit more why flink
connector is designed in such a way. Also correct me if I stated anything wrong.
Gavin Liu