Hello Nicolas, I also encountered the same problem. RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls but this was not clear to me on when this method is called because no message were ack on pipeline runtime. I finally decided to implement a patch of the RabbitMqIO to set auto ack of received messages, this is fine for my current use case but is not the safest way of consuming messages.
If someone has a cleaner solution I’ll be happy to hear it. Augustin > Le 13 juin 2019 à 15:47, Nicolas Delsaux <[email protected]> a écrit : > > I'm having big troubles reading data from RabbitMQ. > > To understand my troubles, i've simplified my previous code to the extreme : > > > Pipeline pipeline = Pipeline.create(options); > > PCollection<Object> wat = (PCollection<Object>) > pipeline.apply("read_from_rabbit", > RabbitMqIO.read() > .withUri(options.getRabbitMQUri()) > .withQueue(options.getRabbitMQQueue()) > ) > .apply("why not", RabbitMqIO.<RabbitMqMessage>write() > .withQueue("written_in_rabbit") > .withQueueDeclare(true) > .withUri(options.getRabbitMQUri()) > ) > > > So if I put a simple message in my input queue, it should be "moved" (quotes > are here since new message is not the original one, but has same content) > into my "written_in_rabbit" message. > > Unfortunatly, for reasons I don't understand, the original message stays in > input queue. > > It seems to be due to the fact that > RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So where > is the finalizeCheckpoint method called ? > > And how can I understand why this method is never called in my case ? > > Thanks > >
