Just a heads-up that the underlying issue has been fixed for Beam 2.10.0.
I've shared this information earlier with Encho and Valeri when the
corresponding ticket was resolved (Dec 30):
https://jira.apache.org/jira/browse/BEAM-5386
Thanks,
Max
On 19.09.18 14:42, Valeri Tsolov wrote:
Hey Max,
I think it is possible but not sure when we are going to plan such activity.
Will return you ASAP.
Thanks,
Valeri
На пн, 17.09.2018 г. в 19:27 ч. Maximilian Michels <[email protected]
<mailto:[email protected]>> написа:
I wonder if you could do some profiling on the TaskManagers and see
where they spend most of their time? That would be very helpful. If it
is indeed `finalizeCheckpoint`, then we could introduce asynchronous
acknowledgement. If it is in `snapshotState`, then we know that the
bottleneck is there.
Do you think profiling on the TaskManagers would be feasible?
Another question: Did you activate asynchronous snapshots?
Thanks,
Max
On 17.09.18 17:15, Encho Mishinev wrote:
> Hi Max,
>
> I agree that the problem might not be in the acknowledgement itself. A
> very long checkpoint could go past the subscription acknowledgement
> deadline (10min is the maximum allowed) and hence the message might be
> resent yielding the behaviour we see.
>
> In any way, the extreme slow down of checkpoints still remains
> unexplained. This occurs even if the job simply reads from Pubsub and
> does nothing else.
>
> We do use FsStateBackend using HDFS. The whole setup is deployed in
> Kubernetes. Any ideas of why this might be happening would be of great
help.
>
> Thanks,
> Encho
>
> On Mon, Sep 17, 2018 at 4:15 PM Maximilian Michels <[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Hi Encho,
>
> Thanks for providing more insight into this. I've re-examined the
> checkpointing code and couldn't find anything suspicious there.
>
> > The first job I stopped right when it processed more messages
than I
> > had loaded. The subscription afterwards had 52 000 unacknowledged
> > messages.
>
> That does sound suspicious with a parallelism of 52, but your other
> experiments don't confirm that there is something wrong with the
> acknowledgment. Rather, it seems the checkpointing itself is taking
> longer and longer. This could also be caused by long acknowlegments,
> since this stalls in-progress checkpoints.
>
> Please check the Web UI for statistics about the checkpoints:
>
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/checkpoint_monitoring.html
>
>
>
> You're going through a lot of messages in between the checkpoints.
> Which
> state backend do you use? Please try re-running your job with the
file
> system state backend (FsStateBackend) or the RocksDB state backend
> (RocksDBStateBackend). For the RocksDB state backend you will have to
> add the RocksDB dependency. The file system backend should work out
of
> the box, just specify a path and set
> FlinkPipelineOptions#setStateBackend(..). See:
>
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html
>
> Next, I could supply you with a custom Beam version which logs more
> debug information.
>
> Best,
> Max
>
> On 13.09.18 16:40, Encho Mishinev wrote:
> > Hello Max,
> >
> > I am currently performing more tests on it and will follow-up with
> > anything I find.
> >
> > Currently I have the following observations:
> >
> > Whenever there are few (relative to the parallelism) messages
> left in a
> > pubsub topic the checkpointing length becomes very long. I have
> tried
> > this with different parallelism. My usual set for testing is 13
task
> > managers with 4 task slots eac, 52 parallelism for the job and
> > checkpointing every 60s. I've done three runs on a subscription
> filled
> > with about 122,000,000 messages. The job works fast going through
> about
> > 1,500,000 messages/minute until it reaches about 120,000,000 or
> so, when
> > it progressively slows down. Checkpointing length increases from
an
> > average of 50-60s to 2:30min-3min. When about a few hundred
thousand
> > messages are left the job mostly does long checkpoints and no
work.
> > Messages pass through but seemingly forever.
> >
> > The first job I stopped right when it processed more messages
> than I had
> > loaded. The subscription afterwards had 52 000 unacknowledged
> messages.
> >
> > Another job with the same approach had 87 000 unacknowledged
> messages.
> >
> > A third job I left over 30 minutes after it had processed more
> messages
> > than I had loaded. It worked very slowly with long checkpoints and
> > processed a few hundred thousand messages in total over the 30
> minute
> > period. That subscription then had only 235 unacknowledged
> messages left.
> >
> > I have put large acknowledgement deadline for the subscriptions
> so that
> > the checkpointing time is less than the deadline (otherwise the
> messages
> > are naturally resent and can't be acknowledged), that
> unfortunately is
> > not the problem.
> >
> > I then tried running the whole thing with parallelism of 1 and
> about 100
> > 000 messages. The job started fast once again, doing a few
> thousand a
> > second and doing all checkpoints in under 3s. Upon reaching about
> 90 000
> > it again started to slow down. This time it slowly reached it's
> goal and
> > there were actually no unacknowledged messages, but the last 10
000
> > messages were processed dreadfully slowly and one checkpoint
> during that
> > period took 45s (compared to tens of checkpoints under 3s before
> that).
> >
> > I am not sure how to check how many messages get acknowledged per
> > checkpoint.
> > I'm open to trying new runs and sharing the results - let me know
> if you
> > want me to try and run the job with some specific parameters.
> >
> > Thanks for the help,
> > Encho
> >
> > On Thu, Sep 13, 2018 at 5:20 PM Maximilian Michels
> <[email protected] <mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >
> > That is indeed strange. Would you be able to provide some
> debugging
> > information, e.g. how many message get acked for each
checkpoint?
> >
> > What is the parallelism of your job?
> >
> > Thanks,
> > Max
> >
> > On 12.09.18 12:57, Encho Mishinev wrote:
> > > Hello Max,
> > >
> > > Thanks for the answer. My guess was that they are
> acknowledged at
> > > completion of Flink's checkpoints, but wanted to make sure
> since
> > that
> > > doesn't explain my problem.
> > >
> > > Whenever a subscription is nearly empty the job gets slower
> > overall and
> > > the Flink's checkpoints start taking much more time
> (thrice or more)
> > > even though their state is much smaller, and of course,
> there always
> > > seem to be messages cycling over and over again.
> > >
> > > If you have any clue at all why this might be, let me know.
> > >
> > > Thanks for the help,
> > > Encho
> > >
> > > On Tue, Sep 11, 2018 at 1:45 PM Maximilian Michels
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>>
> > > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>>>> wrote:
> > >
> > > Hey Encho,
> > >
> > > The Flink Runner acknowledges messages through
PubSubIO's
> > > `CheckpointMark#finalizeCheckpoint()` method.
> > >
> > > The Flink Runner wraps the PubSubIO source via the
> > > UnboundedSourceWrapper. When Flink takes a checkpoint
> of the
> > running
> > > Beam streaming job, the wrapper will retrieve the
> > CheckpointMarks from
> > > the PubSubIO source.
> > >
> > > When the Checkpoint is completed, there is a callback
> which
> > informs the
> > > wrapper (`notifyCheckpointComplete()`) and calls
> > `finalizeCheckpoint()`
> > > on all the generated CheckpointMarks.
> > >
> > > Hope that helps debugging your problem. I don't have an
> > explanation why
> > > this doesn't work for the last records in your PubSub
> queue. It
> > > shouldn't make a difference for how the Flink Runner
does
> > checkpointing.
> > >
> > > Best,
> > > Max
> > >
> > > On 10.09.18 18:17, Encho Mishinev wrote:
> > > > Hello,
> > > >
> > > > I am using Flink runner with Apache Beam 2.6.0. I
was
> > wondering
> > > if there
> > > > is information on when exactly the runner
> acknowledges a
> > pubsub
> > > message
> > > > when reading from PubsubIO?
> > > >
> > > > My problem is that whenever there are a few
> messages left in a
> > > > subscription my streaming job never really seems to
> > acknowledge them
> > > > all. For example is a subscription has 100,000,000
> messages in
> > > total,
> > > > the job will go through about 99,990,000 and then
> keep reading
> > > the last
> > > > few thousand and seemingly never acknowledge them.
> > > >
> > > > Some clarity on when the acknowledgement happens in
the
> > pipeline
> > > might
> > > > help me debug this problem.
> > > >
> > > > Thanks!
> > >
> >
>
--
*Valeri*
* Tsolov*
Software engineer
089.358.1040
<http://www.leanplum.com/>
Mobile Engagement Delivered
Find out how in <90 seconds! <https://vimeo.com/241978055>