Is there a JIRA for this ? if this solves an issue to multiple users
maybe is worth of integrating the patch.
Would you be up to do this Augustin?

On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere
<[email protected]> wrote:
>
> Hello Nicolas,
> I also encountered the same problem.
> RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls but this 
> was not clear to me on when this method is called because no message were ack 
> on pipeline runtime.
> I finally decided to implement a patch of the RabbitMqIO to set auto ack of 
> received messages, this is fine for my current use case but is not the safest 
> way of consuming messages.
>
> If someone has a cleaner solution I’ll be happy to hear it.
>
> Augustin
>
>
>
>
> > Le 13 juin 2019 à 15:47, Nicolas Delsaux <[email protected]> a écrit :
> >
> > I'm having big troubles reading data from RabbitMQ.
> >
> > To understand my troubles, i've simplified my previous code to the extreme :
> >
> >
> >         Pipeline pipeline = Pipeline.create(options);
> >
> >         PCollection<Object> wat = (PCollection<Object>) 
> > pipeline.apply("read_from_rabbit",
> >                 RabbitMqIO.read()
> >                     .withUri(options.getRabbitMQUri())
> >                     .withQueue(options.getRabbitMQQueue())
> >                     )
> >                 .apply("why not", RabbitMqIO.<RabbitMqMessage>write()
> >                         .withQueue("written_in_rabbit")
> >                         .withQueueDeclare(true)
> >                         .withUri(options.getRabbitMQUri())
> >                         )
> >
> >
> > So if I put a simple message in my input queue, it should be "moved" 
> > (quotes are here since new message is not the original one, but has same 
> > content) into my "written_in_rabbit" message.
> >
> > Unfortunatly, for reasons I don't understand, the original message stays in 
> > input queue.
> >
> > It seems to be due to the fact that 
> > RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So 
> > where is the finalizeCheckpoint method called ?
> >
> > And how can I understand why this method is never called in my case ?
> >
> > Thanks
> >
> >
>

Reply via email to