In my opinion, for such crucial behavior i would expect the pipeline to fail with a clear message stating the reason, like in the same way when you implement a new Codec and forget to override the verifyDeterministic method (don't recall the right name of it).
Just my 2 cents. Maximilian Michels <[email protected]> schrieb am Fr., 14. Juni 2019, 16:48: > This has come up before: https://issues.apache.org/jira/browse/BEAM-4520 > > The issue is that checkpoints won't be acknowledged if checkpointing is > disabled in Flink. We throw a WARN when unbounded sources are used without > checkpointing. Not all unbounded sources actually need to finalize > checkpoint marks. > > Seeing that this is still an issue, we might want to at least periodically > acknowledge checkpoint marks when checkpointing is disabled. The > alternative would be to throw an exception, perhaps with the option to > override this in case the user knows what he/she does. > > Thanks, > Max > > On 14.06.19 10:52, Ismaël Mejía wrote: > > Is there a JIRA for this ? if this solves an issue to multiple users > > maybe is worth of integrating the patch. > > Would you be up to do this Augustin? > > > > On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere > > <[email protected]> wrote: > > > > > > Hello Nicolas, > > > I also encountered the same problem. > > > RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls > but this was not clear to me on when this method is called because no > message were ack on pipeline runtime. > > > I finally decided to implement a patch of the RabbitMqIO to set auto > ack of received messages, this is fine for my current use case but is not > the safest way of consuming messages. > > > > > > If someone has a cleaner solution I’ll be happy to hear it. > > > > > > Augustin > > > > > > > > > > > > > > >> Le 13 juin 2019 à 15:47, Nicolas Delsaux <[email protected]> a > écrit : > > >> > > >> I'm having big troubles reading data from RabbitMQ. > > >> > > >> To understand my troubles, i've simplified my previous code to the > extreme : > > >> > > >> > > >> Pipeline pipeline = Pipeline.create(options); > > >> > > >> PCollection<Object> wat = (PCollection<Object>) > pipeline.apply("read_from_rabbit", > > >> RabbitMqIO.read() > > >> .withUri(options.getRabbitMQUri()) > > >> .withQueue(options.getRabbitMQQueue()) > > >> ) > > >> .apply("why not", RabbitMqIO.<RabbitMqMessage>write() > > >> .withQueue("written_in_rabbit") > > >> .withQueueDeclare(true) > > >> .withUri(options.getRabbitMQUri()) > > >> ) > > >> > > >> > > >> So if I put a simple message in my input queue, it should be "moved" > (quotes are here since new message is not the original one, but has same > content) into my "written_in_rabbit" message. > > >> > > >> Unfortunatly, for reasons I don't understand, the original message > stays in input queue. > > >> > > >> It seems to be due to the fact that > RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So > where is the finalizeCheckpoint method called ? > > >> > > >> And how can I understand why this method is never called in my case ? > > >> > > >> Thanks > > >> > > >> > > > > >
