Is there a JIRA for this ? if this solves an issue to multiple users maybe is worth of integrating the patch. Would you be up to do this Augustin?
On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere <[email protected]> wrote: > > Hello Nicolas, > I also encountered the same problem. > RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls but this > was not clear to me on when this method is called because no message were ack > on pipeline runtime. > I finally decided to implement a patch of the RabbitMqIO to set auto ack of > received messages, this is fine for my current use case but is not the safest > way of consuming messages. > > If someone has a cleaner solution I’ll be happy to hear it. > > Augustin > > > > > > Le 13 juin 2019 à 15:47, Nicolas Delsaux <[email protected]> a écrit : > > > > I'm having big troubles reading data from RabbitMQ. > > > > To understand my troubles, i've simplified my previous code to the extreme : > > > > > > Pipeline pipeline = Pipeline.create(options); > > > > PCollection<Object> wat = (PCollection<Object>) > > pipeline.apply("read_from_rabbit", > > RabbitMqIO.read() > > .withUri(options.getRabbitMQUri()) > > .withQueue(options.getRabbitMQQueue()) > > ) > > .apply("why not", RabbitMqIO.<RabbitMqMessage>write() > > .withQueue("written_in_rabbit") > > .withQueueDeclare(true) > > .withUri(options.getRabbitMQUri()) > > ) > > > > > > So if I put a simple message in my input queue, it should be "moved" > > (quotes are here since new message is not the original one, but has same > > content) into my "written_in_rabbit" message. > > > > Unfortunatly, for reasons I don't understand, the original message stays in > > input queue. > > > > It seems to be due to the fact that > > RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So > > where is the finalizeCheckpoint method called ? > > > > And how can I understand why this method is never called in my case ? > > > > Thanks > > > > >
