I'm going to start with number two, because it's got an easy answer: When performing an unbounded read, the DirectRunner will finalize a checkpoint after it has completed a subsequent read from that split where at least one element was read. A bounded read from an unbounded source will never be finalized by any runner. See https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L221
For number one: having a checkpoint contain unacked messages is reasonable (required). Acking those messages when a checkpoint is finalized is also reasonable. Checkpoints must not track any records produced after the call to checkpoint() that they were produced in. If they do, they will improperly finalize messages that have not been committed. Creating a new checkpoint whenever a reader is started or a checkpoint is taken and storing state in them is a suitable way to ensure this. You will likely need the reader to maintain its own view of pending unacked messages, which finalizeCheckpoint can update (in a thread-safe manner, guarding against the reader no longer being present). You may be able to track these at the per-checkpoint level, where a finalized checkpoint is removed from the collection of things that hold the watermark. On Wed, May 10, 2017 at 8:51 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Beamers, > > I'm working on some fixes in the JmsIO and MqttIO. > > Those two IOs behave in a similar way on the reading side: > > - they consume messages from a JMS or MQTT broker > - the "pending" messages are stored in the checkpoint mark. When a new > message is added to the checkpoint, we compare the timestamp of the message > with the oldest pending message timestamp. It advances the watermark: so > the watermark is basically the timestamp of the oldest pending message. > - when the runner calls finalize on the checkpoint, then, we ack the > messages. > > Testing this with direct runner, it seems the finalize is never called on > checkpoints. So, basically, it means that the messages are not fully > consumed from the broker (as the ack is not sent). > > I tried to a fair volume of messages (1000000) and the checkpoint is not > finalize. > > Basically, I have two questions: > 1. what do you think about this approach: storing pending messages and > advancing the watermark this way ? > 2. any idea when the direct runner will call the finalize on the > checkpoint ? > > Thanks ! > Regards > JB > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >