In my opinion, for such crucial behavior i would expect the pipeline to
fail with a clear message stating the reason, like in the same way when you
implement a new Codec and forget to override the verifyDeterministic method
(don't recall the right name of it).

Just my 2 cents.

Maximilian Michels <[email protected]> schrieb am Fr., 14. Juni 2019, 16:48:

> This has come up before: https://issues.apache.org/jira/browse/BEAM-4520
>
> The issue is that checkpoints won't be acknowledged if checkpointing is
> disabled in Flink. We throw a WARN when unbounded sources are used without
> checkpointing. Not all unbounded sources actually need to finalize
> checkpoint marks.
>
> Seeing that this is still an issue, we might want to at least periodically
> acknowledge checkpoint marks when checkpointing is disabled. The
> alternative would be to throw an exception, perhaps with the option to
> override this in case the user knows what he/she does.
>
> Thanks,
> Max
>
> On 14.06.19 10:52, Ismaël Mejía wrote:
> > Is there a JIRA for this ? if this solves an issue to multiple users
> > maybe is worth of integrating the patch.
> > Would you be up to do this Augustin?
> >
> > On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere
> > <[email protected]> wrote:
> > >
> > > Hello Nicolas,
> > > I also encountered the same problem.
> > > RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls
> but this was not clear to me on when this method is called because no
> message were ack on pipeline runtime.
> > > I finally decided to implement a patch of the RabbitMqIO to set auto
> ack of received messages, this is fine for my current use case but is not
> the safest way of consuming messages.
> > >
> > > If someone has a cleaner solution I’ll be happy to hear it.
> > >
> > > Augustin
> > >
> > >
> > >
> > >
> > >> Le 13 juin 2019 à 15:47, Nicolas Delsaux <[email protected]> a
> écrit :
> > >>
> > >> I'm having big troubles reading data from RabbitMQ.
> > >>
> > >> To understand my troubles, i've simplified my previous code to the
> extreme :
> > >>
> > >>
> > >>         Pipeline pipeline = Pipeline.create(options);
> > >>
> > >>         PCollection<Object> wat = (PCollection<Object>)
> pipeline.apply("read_from_rabbit",
> > >>                 RabbitMqIO.read()
> > >>                     .withUri(options.getRabbitMQUri())
> > >>                     .withQueue(options.getRabbitMQQueue())
> > >>                     )
> > >>                 .apply("why not", RabbitMqIO.<RabbitMqMessage>write()
> > >>                         .withQueue("written_in_rabbit")
> > >>                         .withQueueDeclare(true)
> > >>                         .withUri(options.getRabbitMQUri())
> > >>                         )
> > >>
> > >>
> > >> So if I put a simple message in my input queue, it should be "moved"
> (quotes are here since new message is not the original one, but has same
> content) into my "written_in_rabbit" message.
> > >>
> > >> Unfortunatly, for reasons I don't understand, the original message
> stays in input queue.
> > >>
> > >> It seems to be due to the fact that
> RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So
> where is the finalizeCheckpoint method called ?
> > >>
> > >> And how can I understand why this method is never called in my case ?
> > >>
> > >> Thanks
> > >>
> > >>
> > >
>
>

Reply via email to