Just a heads-up that the underlying issue has been fixed for Beam 2.10.0.

I've shared this information earlier with Encho and Valeri when the corresponding ticket was resolved (Dec 30): https://jira.apache.org/jira/browse/BEAM-5386

Thanks,
Max

On 19.09.18 14:42, Valeri Tsolov wrote:
Hey Max,
I think it is possible but not sure when we are going to plan such activity. Will return you ASAP.

Thanks,
Valeri

На пн, 17.09.2018 г. в 19:27 ч. Maximilian Michels <[email protected] <mailto:[email protected]>> написа:

    I wonder if you could do some profiling on the TaskManagers and see
    where they spend most of their time? That would be very helpful. If it
    is indeed `finalizeCheckpoint`, then we could introduce asynchronous
    acknowledgement. If it is in `snapshotState`, then we know that the
    bottleneck is there.

    Do you think profiling on the TaskManagers would be feasible?

    Another question: Did you activate asynchronous snapshots?

    Thanks,
    Max

    On 17.09.18 17:15, Encho Mishinev wrote:
     > Hi Max,
     >
     > I agree that the problem might not be in the acknowledgement itself. A
     > very long checkpoint could go past the subscription acknowledgement
     > deadline (10min is the maximum allowed) and hence the message might be
     > resent yielding the behaviour we see.
     >
     > In any way, the extreme slow down of checkpoints still remains
     > unexplained. This occurs even if the job simply reads from Pubsub and
     > does nothing else.
     >
     > We do use FsStateBackend using HDFS. The whole setup is deployed in
     > Kubernetes. Any ideas of why this might be happening would be of great 
help.
     >
     > Thanks,
     > Encho
     >
     > On Mon, Sep 17, 2018 at 4:15 PM Maximilian Michels <[email protected]
    <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >     Hi Encho,
     >
     >     Thanks for providing more insight into this. I've re-examined the
     >     checkpointing code and couldn't find anything suspicious there.
     >
     >       > The first job I stopped right when it processed more messages 
than I
     >       > had loaded. The subscription afterwards had 52 000 unacknowledged
     >       > messages.
     >
     >     That does sound suspicious with a parallelism of 52, but your other
     >     experiments don't confirm that there is something wrong with the
     >     acknowledgment. Rather, it seems the checkpointing itself is taking
     >     longer and longer. This could also be caused by long acknowlegments,
     >     since this stalls in-progress checkpoints.
     >
     >     Please check the Web UI for statistics about the checkpoints:
     >
    
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/checkpoint_monitoring.html
     >
     >
     >
     >     You're going through a lot of messages in between the checkpoints.
     >     Which
     >     state backend do you use? Please try re-running your job with the 
file
     >     system state backend (FsStateBackend) or the RocksDB state backend
     >     (RocksDBStateBackend). For the RocksDB state backend you will have to
     >     add the RocksDB dependency. The file system backend should work out 
of
     >     the box, just specify a path and set
     >     FlinkPipelineOptions#setStateBackend(..). See:
     >
    
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html
     >
     >     Next, I could supply you with a custom Beam version which logs more
     >     debug information.
     >
     >     Best,
     >     Max
     >
     >     On 13.09.18 16:40, Encho Mishinev wrote:
     >      > Hello Max,
     >      >
     >      > I am currently performing more tests on it and will follow-up with
     >      > anything I find.
     >      >
     >      > Currently I have the following observations:
     >      >
     >      > Whenever there are few (relative to the parallelism) messages
     >     left in a
     >      > pubsub topic the checkpointing length becomes very long. I have
     >     tried
     >      > this with different parallelism. My usual set for testing is 13 
task
     >      > managers with 4 task slots eac, 52 parallelism for the job and
     >      > checkpointing every 60s. I've done three runs on a subscription
     >     filled
     >      > with about 122,000,000 messages. The job works fast going through
     >     about
     >      > 1,500,000 messages/minute until it reaches about 120,000,000 or
     >     so, when
     >      > it progressively slows down. Checkpointing length increases from 
an
     >      > average of 50-60s to 2:30min-3min. When about a few hundred 
thousand
     >      > messages are left the job mostly does long checkpoints and no 
work.
     >      > Messages pass through but seemingly forever.
     >      >
     >      > The first job I stopped right when it processed more messages
     >     than I had
     >      > loaded. The subscription afterwards had 52 000 unacknowledged
     >     messages.
     >      >
     >      > Another job with the same approach had 87 000 unacknowledged
     >     messages.
     >      >
     >      > A third job I left over 30 minutes after it had processed more
     >     messages
     >      > than I had loaded. It worked very slowly with long checkpoints and
     >      > processed a few hundred thousand messages in total over the 30
     >     minute
     >      > period. That subscription then had only 235 unacknowledged
     >     messages left.
     >      >
     >      > I have put large acknowledgement deadline for the subscriptions
     >     so that
     >      > the checkpointing time is less than the deadline (otherwise the
     >     messages
     >      > are naturally resent and can't be acknowledged), that
     >     unfortunately is
     >      > not the problem.
     >      >
     >      > I then tried running the whole thing with parallelism of 1 and
     >     about 100
     >      > 000 messages. The job started fast once again, doing a few
     >     thousand a
     >      > second and doing all checkpoints in under 3s. Upon reaching about
     >     90 000
     >      > it again started to slow down. This time it slowly reached it's
     >     goal and
     >      > there were actually no unacknowledged messages, but the last 10 
000
     >      > messages were processed dreadfully slowly and one checkpoint
     >     during that
     >      > period took 45s (compared to tens of checkpoints under 3s before
     >     that).
     >      >
     >      > I am not sure how to check how many messages get acknowledged per
     >      > checkpoint.
     >      > I'm open to trying new runs and sharing the results - let me know
     >     if you
     >      > want me to try and run the job with some specific parameters.
     >      >
     >      > Thanks for the help,
     >      > Encho
     >      >
     >      > On Thu, Sep 13, 2018 at 5:20 PM Maximilian Michels
     >     <[email protected] <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
     >      > <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >
     >      >     That is indeed strange. Would you be able to provide some
     >     debugging
     >      >     information, e.g. how many message get acked for each 
checkpoint?
     >      >
     >      >     What is the parallelism of your job?
     >      >
     >      >     Thanks,
     >      >     Max
     >      >
     >      >     On 12.09.18 12:57, Encho Mishinev wrote:
     >      >      > Hello Max,
     >      >      >
     >      >      > Thanks for the answer. My guess was that they are
     >     acknowledged at
     >      >      > completion of Flink's checkpoints, but wanted to make sure
     >     since
     >      >     that
     >      >      > doesn't explain my problem.
     >      >      >
     >      >      > Whenever a subscription is nearly empty the job gets slower
     >      >     overall and
     >      >      > the Flink's checkpoints start taking much more time
     >     (thrice or more)
     >      >      > even though their state is much smaller, and of course,
     >     there always
     >      >      > seem to be messages cycling over and over again.
     >      >      >
     >      >      > If you have any clue at all why this might be, let me know.
     >      >      >
     >      >      > Thanks for the help,
     >      >      > Encho
     >      >      >
     >      >      > On Tue, Sep 11, 2018 at 1:45 PM Maximilian Michels
     >      >     <[email protected] <mailto:[email protected]> 
<mailto:[email protected]
    <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]> 
<mailto:[email protected]
    <mailto:[email protected]>>>
     >      >      > <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]> 
<mailto:[email protected]
    <mailto:[email protected]>>>>> wrote:
     >      >      >
     >      >      >     Hey Encho,
     >      >      >
     >      >      >     The Flink Runner acknowledges messages through 
PubSubIO's
     >      >      >     `CheckpointMark#finalizeCheckpoint()` method.
     >      >      >
     >      >      >     The Flink Runner wraps the PubSubIO source via the
     >      >      >     UnboundedSourceWrapper. When Flink takes a checkpoint
     >     of the
     >      >     running
     >      >      >     Beam streaming job, the wrapper will retrieve the
     >      >     CheckpointMarks from
     >      >      >     the PubSubIO source.
     >      >      >
     >      >      >     When the Checkpoint is completed, there is a callback
     >     which
     >      >     informs the
     >      >      >     wrapper (`notifyCheckpointComplete()`) and calls
     >      >     `finalizeCheckpoint()`
     >      >      >     on all the generated CheckpointMarks.
     >      >      >
     >      >      >     Hope that helps debugging your problem. I don't have an
     >      >     explanation why
     >      >      >     this doesn't work for the last records in your PubSub
     >     queue. It
     >      >      >     shouldn't make a difference for how the Flink Runner 
does
     >      >     checkpointing.
     >      >      >
     >      >      >     Best,
     >      >      >     Max
     >      >      >
     >      >      >     On 10.09.18 18:17, Encho Mishinev wrote:
     >      >      >      > Hello,
     >      >      >      >
     >      >      >      > I am using Flink runner with Apache Beam 2.6.0. I 
was
     >      >     wondering
     >      >      >     if there
     >      >      >      > is information on when exactly the runner
     >     acknowledges a
     >      >     pubsub
     >      >      >     message
     >      >      >      > when reading from PubsubIO?
     >      >      >      >
     >      >      >      > My problem is that whenever there are a few
     >     messages left in a
     >      >      >      > subscription my streaming job never really seems to
     >      >     acknowledge them
     >      >      >      > all. For example is a subscription has 100,000,000
     >     messages in
     >      >      >     total,
     >      >      >      > the job will go through about 99,990,000 and then
     >     keep reading
     >      >      >     the last
     >      >      >      > few thousand and seemingly never acknowledge them.
     >      >      >      >
     >      >      >      > Some clarity on when the acknowledgement happens in 
the
     >      >     pipeline
     >      >      >     might
     >      >      >      > help me debug this problem.
     >      >      >      >
     >      >      >      > Thanks!
     >      >      >
     >      >
     >

--
*Valeri*
* Tsolov*
Software engineer
089.358.1040
<http://www.leanplum.com/>
Mobile Engagement Delivered
Find out how in <90 seconds! <https://vimeo.com/241978055>

Reply via email to