I wonder if you could do some profiling on the TaskManagers and see where they spend most of their time? That would be very helpful. If it is indeed `finalizeCheckpoint`, then we could introduce asynchronous acknowledgement. If it is in `snapshotState`, then we know that the bottleneck is there.

Do you think profiling on the TaskManagers would be feasible?

Another question: Did you activate asynchronous snapshots?

Thanks,
Max

On 17.09.18 17:15, Encho Mishinev wrote:
Hi Max,

I agree that the problem might not be in the acknowledgement itself. A very long checkpoint could go past the subscription acknowledgement deadline (10min is the maximum allowed) and hence the message might be resent yielding the behaviour we see.

In any way, the extreme slow down of checkpoints still remains unexplained. This occurs even if the job simply reads from Pubsub and does nothing else.

We do use FsStateBackend using HDFS. The whole setup is deployed in Kubernetes. Any ideas of why this might be happening would be of great help.

Thanks,
Encho

On Mon, Sep 17, 2018 at 4:15 PM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Hi Encho,

    Thanks for providing more insight into this. I've re-examined the
    checkpointing code and couldn't find anything suspicious there.

      > The first job I stopped right when it processed more messages than I
      > had loaded. The subscription afterwards had 52 000 unacknowledged
      > messages.

    That does sound suspicious with a parallelism of 52, but your other
    experiments don't confirm that there is something wrong with the
    acknowledgment. Rather, it seems the checkpointing itself is taking
    longer and longer. This could also be caused by long acknowlegments,
    since this stalls in-progress checkpoints.

    Please check the Web UI for statistics about the checkpoints:
    
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/checkpoint_monitoring.html



    You're going through a lot of messages in between the checkpoints.
    Which
    state backend do you use? Please try re-running your job with the file
    system state backend (FsStateBackend) or the RocksDB state backend
    (RocksDBStateBackend). For the RocksDB state backend you will have to
    add the RocksDB dependency. The file system backend should work out of
    the box, just specify a path and set
    FlinkPipelineOptions#setStateBackend(..). See:
    
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html

    Next, I could supply you with a custom Beam version which logs more
    debug information.

    Best,
    Max

    On 13.09.18 16:40, Encho Mishinev wrote:
     > Hello Max,
     >
     > I am currently performing more tests on it and will follow-up with
     > anything I find.
     >
     > Currently I have the following observations:
     >
     > Whenever there are few (relative to the parallelism) messages
    left in a
     > pubsub topic the checkpointing length becomes very long. I have
    tried
     > this with different parallelism. My usual set for testing is 13 task
     > managers with 4 task slots eac, 52 parallelism for the job and
     > checkpointing every 60s. I've done three runs on a subscription
    filled
     > with about 122,000,000 messages. The job works fast going through
    about
     > 1,500,000 messages/minute until it reaches about 120,000,000 or
    so, when
     > it progressively slows down. Checkpointing length increases from an
     > average of 50-60s to 2:30min-3min. When about a few hundred thousand
     > messages are left the job mostly does long checkpoints and no work.
     > Messages pass through but seemingly forever.
     >
     > The first job I stopped right when it processed more messages
    than I had
     > loaded. The subscription afterwards had 52 000 unacknowledged
    messages.
     >
     > Another job with the same approach had 87 000 unacknowledged
    messages.
     >
     > A third job I left over 30 minutes after it had processed more
    messages
     > than I had loaded. It worked very slowly with long checkpoints and
     > processed a few hundred thousand messages in total over the 30
    minute
     > period. That subscription then had only 235 unacknowledged
    messages left.
     >
     > I have put large acknowledgement deadline for the subscriptions
    so that
     > the checkpointing time is less than the deadline (otherwise the
    messages
     > are naturally resent and can't be acknowledged), that
    unfortunately is
     > not the problem.
     >
     > I then tried running the whole thing with parallelism of 1 and
    about 100
     > 000 messages. The job started fast once again, doing a few
    thousand a
     > second and doing all checkpoints in under 3s. Upon reaching about
    90 000
     > it again started to slow down. This time it slowly reached it's
    goal and
     > there were actually no unacknowledged messages, but the last 10 000
     > messages were processed dreadfully slowly and one checkpoint
    during that
     > period took 45s (compared to tens of checkpoints under 3s before
    that).
     >
     > I am not sure how to check how many messages get acknowledged per
     > checkpoint.
     > I'm open to trying new runs and sharing the results - let me know
    if you
     > want me to try and run the job with some specific parameters.
     >
     > Thanks for the help,
     > Encho
     >
     > On Thu, Sep 13, 2018 at 5:20 PM Maximilian Michels
    <[email protected] <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >     That is indeed strange. Would you be able to provide some
    debugging
     >     information, e.g. how many message get acked for each checkpoint?
     >
     >     What is the parallelism of your job?
     >
     >     Thanks,
     >     Max
     >
     >     On 12.09.18 12:57, Encho Mishinev wrote:
     >      > Hello Max,
     >      >
     >      > Thanks for the answer. My guess was that they are
    acknowledged at
     >      > completion of Flink's checkpoints, but wanted to make sure
    since
     >     that
     >      > doesn't explain my problem.
     >      >
     >      > Whenever a subscription is nearly empty the job gets slower
     >     overall and
     >      > the Flink's checkpoints start taking much more time
    (thrice or more)
     >      > even though their state is much smaller, and of course,
    there always
     >      > seem to be messages cycling over and over again.
     >      >
     >      > If you have any clue at all why this might be, let me know.
     >      >
     >      > Thanks for the help,
     >      > Encho
     >      >
     >      > On Tue, Sep 11, 2018 at 1:45 PM Maximilian Michels
     >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >      > <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >
     >      >     Hey Encho,
     >      >
     >      >     The Flink Runner acknowledges messages through PubSubIO's
     >      >     `CheckpointMark#finalizeCheckpoint()` method.
     >      >
     >      >     The Flink Runner wraps the PubSubIO source via the
     >      >     UnboundedSourceWrapper. When Flink takes a checkpoint
    of the
     >     running
     >      >     Beam streaming job, the wrapper will retrieve the
     >     CheckpointMarks from
     >      >     the PubSubIO source.
     >      >
     >      >     When the Checkpoint is completed, there is a callback
    which
     >     informs the
     >      >     wrapper (`notifyCheckpointComplete()`) and calls
     >     `finalizeCheckpoint()`
     >      >     on all the generated CheckpointMarks.
     >      >
     >      >     Hope that helps debugging your problem. I don't have an
     >     explanation why
     >      >     this doesn't work for the last records in your PubSub
    queue. It
     >      >     shouldn't make a difference for how the Flink Runner does
     >     checkpointing.
     >      >
     >      >     Best,
     >      >     Max
     >      >
     >      >     On 10.09.18 18:17, Encho Mishinev wrote:
     >      >      > Hello,
     >      >      >
     >      >      > I am using Flink runner with Apache Beam 2.6.0. I was
     >     wondering
     >      >     if there
     >      >      > is information on when exactly the runner
    acknowledges a
     >     pubsub
     >      >     message
     >      >      > when reading from PubsubIO?
     >      >      >
     >      >      > My problem is that whenever there are a few
    messages left in a
     >      >      > subscription my streaming job never really seems to
     >     acknowledge them
     >      >      > all. For example is a subscription has 100,000,000
    messages in
     >      >     total,
     >      >      > the job will go through about 99,990,000 and then
    keep reading
     >      >     the last
     >      >      > few thousand and seemingly never acknowledge them.
     >      >      >
     >      >      > Some clarity on when the acknowledgement happens in the
     >     pipeline
     >      >     might
     >      >      > help me debug this problem.
     >      >      >
     >      >      > Thanks!
     >      >
     >

Reply via email to