My test is using the direct runner with the streaming option set to true.

As it is the runner that should be used for development (as far as I
understand), I suppose it should expose most of the "correct" behaviours
... including the fact that checkpointoing algorithm should be
understandable, no ?


Le 13/06/2019 à 15:53, Jan Lukavský a écrit :
Hi Nicolas,

what runner do you use? Have you configured checkpoints (if it is one
that needs checkpoints to be configured - e.g. Flink)?

Jan

On 6/13/19 3:47 PM, Nicolas Delsaux wrote:
I'm having big troubles reading data from RabbitMQ.

To understand my troubles, i've simplified my previous code to the
extreme :


        Pipeline pipeline = Pipeline.create(options);

        PCollection<Object> wat = (PCollection<Object>)
pipeline.apply("read_from_rabbit",
                RabbitMqIO.read()
                    .withUri(options.getRabbitMQUri())
                    .withQueue(options.getRabbitMQQueue())
                    )
                .apply("why not", RabbitMqIO.<RabbitMqMessage>write()
                        .withQueue("written_in_rabbit")
                        .withQueueDeclare(true)
                        .withUri(options.getRabbitMQUri())
                        )


So if I put a simple message in my input queue, it should be "moved"
(quotes are here since new message is not the original one, but has
same content) into my "written_in_rabbit" message.

Unfortunatly, for reasons I don't understand, the original message
stays in input queue.

It seems to be due to the fact that
RabbitMQCheckpointMark#finalizeCheckpoint() method is never called.
So where is the finalizeCheckpoint method called ?

And how can I understand why this method is never called in my case ?

Thanks


Reply via email to