For the UnboundedSource interface, this depends on the Runner. Generally, close() should be called when no more data will be read from the Reader. The FlinkRunner calls `close()` on the Readers when it closes the operator (see UnboundedSourceWrapper).

The best documentation we have for this are the JavaDoc comments.

Thanks,
Max

On 06.10.18 15:30, flyisland wrote:
Got it, thanks, will double check the PubsubUnboundedSource.

btw, I'd like to learn more about the lifecycle of IO connector(for example, when will the runner call the reader's close() method?), could you recommend some documents, thanks!

On Fri, Oct 5, 2018 at 11:01 PM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    Restoring from a checkpoint is something different. You asked about
    acking pending CheckpointMarks.

    If you look in the PubsubUnboundedSource, it doesn't close the client
    connection if the there are still unacked checkpoints. a) Closing the
    reader and b) closing the client connection, these are two different
    actions which do not have to depend on each other.

    On 05.10.18 16:06, flyisland wrote:
     > I've checked the PubsubUnboundedSource, it just throws an
    exception if
     > it's a "restored checkpoint".
     >
     > Actually, for most MQ system, it's no way to ack a message if the
     > Reader(connection) is closed. So it makes no sense to call the
     > finalizeCheckpoint() method after closed the Reader.
     >
     > On Fri, Oct 5, 2018 at 9:01 PM Maximilian Michels <m...@apache.org
    <mailto:m...@apache.org>
     > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
     >
     >     Hi,
     >
     >     Not sure whether I'm a guru but I'll try to answer your
    question ;)
     >
     >      > Is there any way to ask the runner to call
    finalizeCheckpoint()
     >     method before it closed the Reader?
     >     Not that I'm aware of.
     >
     >     The point the comment is trying to make is that
    CheckpointMark should
     >     not depend on the life cycle of the Reader. So you should
    find a way to
     >     encode all necessary information in your CheckpointMark to
    acknowledge
     >     even after `close()` has been called on the Reader.
     >
     >     Perhaps you want to check out out PubsubUnboundedSource for
    an example
     >     of how to do that?
     >
     >     Cheers,
     >     Max
     >
     >     On 05.10.18 14:47, flyisland wrote:
     >      > Hi Gurus,
     >      >
     >      > I'm building a new IO connector now, and I try to ack
    messages in
     >     the
     >      > "UnboundedSource.CheckpointMark.finalizeCheckpoint()"
    method as
     >     MqttIO
     >      > and JmsIO did, but I found in the javadoc it said
     >      >
     >      >  >  It is NOT safe to assume the
    UnboundedSource.UnboundedReader
     >     from
     >      > which this checkpoint was created still exists at the time
    this
     >     method
     >      > is called.
     >      >
     >      > I do encounter this situation in my testing with the Direct
     >     Runner, the
     >      > "msg.ack()" method failed when the finalizeCheckpoint()
    method is
     >     called
     >      > since the related reader has already been closed!
     >      >
     >      > Is there any way to ask the runner to call
    finalizeCheckpoint()
     >     method
     >      > before it closed the Reader?
     >

Reply via email to