Restoring from a checkpoint is something different. You asked about acking pending CheckpointMarks.

If you look in the PubsubUnboundedSource, it doesn't close the client connection if the there are still unacked checkpoints. a) Closing the reader and b) closing the client connection, these are two different actions which do not have to depend on each other.

On 05.10.18 16:06, flyisland wrote:
I've checked the PubsubUnboundedSource, it just throws an exception if it's a "restored checkpoint".

Actually, for most MQ system, it's no way to ack a message if the Reader(connection) is closed. So it makes no sense to call the finalizeCheckpoint() method after closed the Reader.

On Fri, Oct 5, 2018 at 9:01 PM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    Hi,

    Not sure whether I'm a guru but I'll try to answer your question ;)

     > Is there any way to ask the runner to call finalizeCheckpoint()
    method before it closed the Reader?
    Not that I'm aware of.

    The point the comment is trying to make is that CheckpointMark should
    not depend on the life cycle of the Reader. So you should find a way to
    encode all necessary information in your CheckpointMark to acknowledge
    even after `close()` has been called on the Reader.

    Perhaps you want to check out out PubsubUnboundedSource for an example
    of how to do that?

    Cheers,
    Max

    On 05.10.18 14:47, flyisland wrote:
     > Hi Gurus,
     >
     > I'm building a new IO connector now, and I try to ack messages in
    the
     > "UnboundedSource.CheckpointMark.finalizeCheckpoint()" method as
    MqttIO
     > and JmsIO did, but I found in the javadoc it said
     >
     >  >  It is NOT safe to assume the UnboundedSource.UnboundedReader
    from
     > which this checkpoint was created still exists at the time this
    method
     > is called.
     >
     > I do encounter this situation in my testing with the Direct
    Runner, the
     > "msg.ack()" method failed when the finalizeCheckpoint() method is
    called
     > since the related reader has already been closed!
     >
     > Is there any way to ask the runner to call finalizeCheckpoint()
    method
     > before it closed the Reader?

Reply via email to