My test is using the direct runner with the streaming option set to true.
As it is the runner that should be used for development (as far as I understand), I suppose it should expose most of the "correct" behaviours ... including the fact that checkpointoing algorithm should be understandable, no ? Le 13/06/2019 à 15:53, Jan Lukavský a écrit :
Hi Nicolas, what runner do you use? Have you configured checkpoints (if it is one that needs checkpoints to be configured - e.g. Flink)? Jan On 6/13/19 3:47 PM, Nicolas Delsaux wrote:I'm having big troubles reading data from RabbitMQ. To understand my troubles, i've simplified my previous code to the extreme : Pipeline pipeline = Pipeline.create(options); PCollection<Object> wat = (PCollection<Object>) pipeline.apply("read_from_rabbit", RabbitMqIO.read() .withUri(options.getRabbitMQUri()) .withQueue(options.getRabbitMQQueue()) ) .apply("why not", RabbitMqIO.<RabbitMqMessage>write() .withQueue("written_in_rabbit") .withQueueDeclare(true) .withUri(options.getRabbitMQUri()) ) So if I put a simple message in my input queue, it should be "moved" (quotes are here since new message is not the original one, but has same content) into my "written_in_rabbit" message. Unfortunatly, for reasons I don't understand, the original message stays in input queue. It seems to be due to the fact that RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So where is the finalizeCheckpoint method called ? And how can I understand why this method is never called in my case ? Thanks
