How to deserialize Avro content from BigQuery using Apache Beam?

2024-02-09 Thread Nicolas Delsaux

Hi all,

This question is a copy of https://stackoverflow.com/q/77970866/15619 I
ask here to have the best chance to have an answer. (but don't worry,
I'll take care of the cross communication)

I'm currently trying to read a data table from Google BigQuery using
Apache Beam/Google Dataflow. I have extracted the avro schema from
BigQuery and generated the Java classes from that avro schema. As I
don't want to read all the table, I use an SQL query to get the last
record. And since the documentation states that
|BigQueryIO.readTableRows()|

is slower than |BigQueryIO.read(SerializableFunction)|
,
I tried to use |BigQueryIO.readWithDatumReader(...)|

the following way

|return pipeline.apply("Read from BigQuery query", BigQueryIO
.readWithDatumReader((AvroSource.DatumReaderFactory)
(writer, reader) -> new
SpecificDatumReader<>(SupplyChain.getClassSchema())) .fromQuery(query)
.withQueryLocation("EU") .usingStandardSql()
.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) ); |

Unfortunatly, it fails with the following exception

|java.lang.NullPointerException: Cannot invoke "Object.getClass()"
because "instance" is null at
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters(TypeDescriptors.java:354)
at
org.apache.beam.sdk.values.TypeDescriptors.outputOf(TypeDescriptors.java:411)
at
org.apache.beam.sdk.values.TypeDescriptors.outputOf(TypeDescriptors.java:420)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.inferCoder(BigQueryIO.java:1070)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.expand(BigQueryIO.java:1247)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.expand(BigQueryIO.java:914)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:508) at
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56) at
org.apache.beam.sdk.Pipeline.apply(Pipeline.java:194) at
com.auchan.hermes.prd03.Run.source(Run.java:59) at
com.auchan.hermes.prd03.ReadDataFromBigQueryTest.test_can_read_data_from_BigQuery(ReadDataFromBigQueryTest.java:71)
at java.base/java.lang.reflect.Method.invoke(Method.java:568) at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323) |

After having read the source code, I've noticed there is in the
|BigQueryIO.read(SerializableFunction)| source code
a call to |.setParseFn(parseFn)| which is missing from the
|BigQueryIO.readWithDatumReader(...)| method. Is there something else to
do? Am I missing something?


Re: Why is my RabbitMq message never acknowledged ?

2019-06-17 Thread Nicolas Delsaux

My test is using the direct runner with the streaming option set to true.

As it is the runner that should be used for development (as far as I
understand), I suppose it should expose most of the "correct" behaviours
... including the fact that checkpointoing algorithm should be
understandable, no ?


Le 13/06/2019 à 15:53, Jan Lukavský a écrit :

Hi Nicolas,

what runner do you use? Have you configured checkpoints (if it is one
that needs checkpoints to be configured - e.g. Flink)?

Jan

On 6/13/19 3:47 PM, Nicolas Delsaux wrote:

I'm having big troubles reading data from RabbitMQ.

To understand my troubles, i've simplified my previous code to the
extreme :


        Pipeline pipeline = Pipeline.create(options);

        PCollection wat = (PCollection)
pipeline.apply("read_from_rabbit",
                RabbitMqIO.read()
                    .withUri(options.getRabbitMQUri())
                    .withQueue(options.getRabbitMQQueue())
                    )
                .apply("why not", RabbitMqIO.write()
                        .withQueue("written_in_rabbit")
                        .withQueueDeclare(true)
                        .withUri(options.getRabbitMQUri())
                        )


So if I put a simple message in my input queue, it should be "moved"
(quotes are here since new message is not the original one, but has
same content) into my "written_in_rabbit" message.

Unfortunatly, for reasons I don't understand, the original message
stays in input queue.

It seems to be due to the fact that
RabbitMQCheckpointMark#finalizeCheckpoint() method is never called.
So where is the finalizeCheckpoint method called ?

And how can I understand why this method is never called in my case ?

Thanks




Why is my RabbitMq message never acknowledged ?

2019-06-13 Thread Nicolas Delsaux

I'm having big troubles reading data from RabbitMQ.

To understand my troubles, i've simplified my previous code to the extreme :


        Pipeline pipeline = Pipeline.create(options);

        PCollection wat = (PCollection) 
pipeline.apply("read_from_rabbit",

                RabbitMqIO.read()
                    .withUri(options.getRabbitMQUri())
                    .withQueue(options.getRabbitMQQueue())
                    )
                .apply("why not", RabbitMqIO.write()
                        .withQueue("written_in_rabbit")
                        .withQueueDeclare(true)
                        .withUri(options.getRabbitMQUri())
                        )


So if I put a simple message in my input queue, it should be "moved" 
(quotes are here since new message is not the original one, but has same 
content) into my "written_in_rabbit" message.


Unfortunatly, for reasons I don't understand, the original message stays 
in input queue.


It seems to be due to the fact that 
RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So 
where is the finalizeCheckpoint method called ?


And how can I understand why this method is never called in my case ?

Thanks




how is commit information propagated ?

2019-06-13 Thread Nicolas Delsaux

Hi,

I'm being puzzled by the way data is commited in pipelines.

I'm reading data from RabbitMQ and trying to write it to files using 
TextIO and the following pipeline



        pipeline.apply("read_from_rabbit",
                RabbitMqIO.read()
                    .withUri(options.getRabbitMQUri())
                    .withQueue(options.getRabbitMQQueue()))
                .apply(Window.into(
FixedWindows.of(Duration.standardSeconds(1
                .apply("transform_into_kv",
                        MapElements.into(TypeDescriptors
                                .kvs(TypeDescriptors.strings(), 
TypeDescriptors.strings()))
                                .via(new 
RabbitMQMessageToKV(options.getIdPaths()))

                        )
                .apply("transform_into_strings",
MapElements.into(TypeDescriptors.strings())
                                .via(new RabbitToKafka.KVToString())
                        )
                .apply("log",
                        TextIO
                            .write()
                            .withWindowedWrites()
                            .withNumShards(1)
                            .to("messages")
                            .withSuffix(".txt")
                        );

When i run that pipeline, no data is written to messages*.txt, although 
I can see that messages are processed in the transform_to_string step. 
Is it because there is some kind of coordinated commit ? And if so, how 
can I know when such a commit occurs or fails ?




send Avro messages to Kafka using Beam

2019-05-29 Thread Nicolas Delsaux

Hello all

I have a beam job that I use to read messages from RabbitMq t write them
in kafka.

As of now, messages are read/written as JSON.

Obviously, it's not that optimal storage, so i would like to transform
the messages to avro prior to write them in Kafka. I have the URL of a
schema registry I can use to store/get my schema.

But I see nowhere in Beam documentation how to transform my JSON into
Avro data (except by deserializing my JSON to a java class that i will
later transform into avro). Is that deserialization to class the only
way ? or is it possible to generate an avro generic record from my json
"directly" ?

Once my avro data is generated, how can I write it to my Kafka topic ?


Thanks !



Re: a fix to send RabbitMq messages

2019-05-27 Thread Nicolas Delsaux

I've created https://issues.apache.org/jira/browse/BEAM-7433 to track
that bug.

Unfortunatly i have not moch detail to provide ... Sorry


On 2019/05/24 16:37:36, Kenneth Knowles  wrote:
> Coders are all set up by the SDK before the pipeline is given to a
runner,>
> so that sounds like a strange issue. Would you also file a Jira ticket>
> about your experience with the coder registry and the DataflowRunner?>
>
> On Fri, May 24, 2019 at 5:26 AM Nicolas Delsaux >
> wrote:>
>
> > Thanks, PR is started (https://github.com/apache/beam/pull/8677), and>
> > I've set both Alexey and you as potential reviewers.>
> >>
> > Le 24/05/2019 à 13:55, Jean-Baptiste Onofré a écrit :>
> > > Hi,>
> > >>
> > > You can create a PullRequest, I will do the review.>
> > >>
> > > The coder is set on the RabbitMQIO PTransform, so, it should work.>
> > >>
> > > AFAIR, we have a Jira about that and I already started to check.
Not yet>
> > > completed yet.>
> > >>
> > > Regards>
> > > JB>
> > >>
> > > On 24/05/2019 11:01, Nicolas Delsaux wrote:>
> > >> Hi all>
> > >>>
> > >> I'm currently evaluationg Apache Beam to transfer messages from
RabbitMq>
> > >> to kafka with some transform in between.>
> > >>>
> > >> Doing, so, i've discovered some differences between direct runner>
> > >> behaviour and Google Dataflow runner.>
> > >>>
> > >> But first, a small introduction to what I know.>
> > >>>
> > >> From what I understand, elements transmitted between two different>
> > >> transforms are serialized/deserialized.>
> > >>>
> > >> This (de)serialization process is normally managed by Coder, in
which>
> > >> the most used is obviously the Serializablecoder, which takes a>
> > >> serializable object and (de)serialize it using classical java>
> > mechanisms.>
> > >>>
> > >> On direct runner, i had issues with rabbitMq messages, as they
contain>
> > >> in their headers objects that are LongString, an interface
implemented>
> > >> solely in a private static class of RabbitMq, and used for large
text>
> > >> messages.>
> > >>>
> > >> So I wrote a RabbitMqMessageCoder, and installed it in my pipeline>
> > >> (using>
> > >>
pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,>
> > >> new MyCoder())>
> > >>>
> > >> And it worked ! well, not in dataflow runner.>
> > >>>
> > >>>
> > >> indeed, it seems like dataflow runner don't use this coder
registry>
> > >> mechanism (for reasons I absolutely don't understand).>
> > >>>
> > >> So my fix didn't work.>
> > >>>
> > >> After various tries, I finally gave up and directly modified the>
> > >> RabbitMqIO class (from Apache Beam) to handle my case.>
> > >>>
> > >> This fix is available on my Beam fork on GitHub, and i would
like to>
> > >> have it integrated.>
> > >>>
> > >> What is the procedure to do so ?>
> > >>>
> > >> Thanks !>
> > >>>
> >>
>


Re: a fix to send RabbitMq messages

2019-05-24 Thread Nicolas Delsaux

Thanks, PR is started (https://github.com/apache/beam/pull/8677), and
I've set both Alexey and you as potential reviewers.

Le 24/05/2019 à 13:55, Jean-Baptiste Onofré a écrit :

Hi,

You can create a PullRequest, I will do the review.

The coder is set on the RabbitMQIO PTransform, so, it should work.

AFAIR, we have a Jira about that and I already started to check. Not yet
completed yet.

Regards
JB

On 24/05/2019 11:01, Nicolas Delsaux wrote:

Hi all

I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
to kafka with some transform in between.

Doing, so, i've discovered some differences between direct runner
behaviour and Google Dataflow runner.

But first, a small introduction to what I know.

 From what I understand, elements transmitted between two different
transforms are serialized/deserialized.

This (de)serialization process is normally managed by Coder, in which
the most used is obviously the Serializablecoder, which takes a
serializable object and (de)serialize it using classical java mechanisms.

On direct runner, i had issues with rabbitMq messages, as they contain
in their headers objects that are LongString, an interface implemented
solely in a private static class of RabbitMq, and used for large text
messages.

So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
(using
pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
new MyCoder())

And it worked ! well, not in dataflow runner.


indeed, it seems like dataflow runner don't use this coder registry
mechanism (for reasons I absolutely don't understand).

So my fix didn't work.

After various tries, I finally gave up and directly modified the
RabbitMqIO class (from Apache Beam) to handle my case.

This fix is available on my Beam fork on GitHub, and i would like to
have it integrated.

What is the procedure to do so ?

Thanks !



a fix to send RabbitMq messages

2019-05-24 Thread Nicolas Delsaux

Hi all

I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
to kafka with some transform in between.

Doing, so, i've discovered some differences between direct runner
behaviour and Google Dataflow runner.

But first, a small introduction to what I know.

From what I understand, elements transmitted between two different
transforms are serialized/deserialized.

This (de)serialization process is normally managed by Coder, in which
the most used is obviously the Serializablecoder, which takes a
serializable object and (de)serialize it using classical java mechanisms.

On direct runner, i had issues with rabbitMq messages, as they contain
in their headers objects that are LongString, an interface implemented
solely in a private static class of RabbitMq, and used for large text
messages.

So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
(using
pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
new MyCoder())

And it worked ! well, not in dataflow runner.


indeed, it seems like dataflow runner don't use this coder registry
mechanism (for reasons I absolutely don't understand).

So my fix didn't work.

After various tries, I finally gave up and directly modified the
RabbitMqIO class (from Apache Beam) to handle my case.

This fix is available on my Beam fork on GitHub, and i would like to
have it integrated.

What is the procedure to do so ?

Thanks !