I see. I don't think this is supported by AvroIO currently so your best bet
will probably be to do a GenericRecord to SpecificRecord conversion in a
ParDo that follows the read transform.
On Thu, Jun 13, 2019 at 4:32 PM Neville Li wrote:
> That gives me a GenericRecord which is not type safe.
>
That gives me a GenericRecord which is not type safe.
In my case I have the compiled SpecificRecord class i.e. MyRecord
available, but would like to pass in a schema other than
MyRecord.getClassSchema() to say populate a subset of the fields.
On Thu, Jun 13, 2019 at 6:18 PM Chamikara Jayalath
wro
The error seems to be originating from
https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java#L166
Can you check what is the endpoint for the Data Channel and why was it not
set to begin with?
Few more questions
*
Does AvroIO.readGenericRecords() work ?
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L333
Thanks,
Cham
On Thu, Jun 13, 2019 at 1:46 PM Neville Li wrote:
> Hi,
>
> Is it just me or is there no way for AvroIO to read SpecificRecords wi
Hi,
Is it just me or is there no way for AvroIO to read SpecificRecords with a
custom reader schema?
AvroIO.read(Class recordClass) will use the schema of T and there's no
way to override it.
Cheers,
Neville
I'm not sure the log excerpt you attached contains the root cause of your
issue. Unfortunately, Beam's portable runners print a great deal of
irrelevant error messages like these, even when the SDK harness disconnects
normally. So it is possible the real issue is buried elsewhere in the log?
Kyle
Hi Nicolas,
what runner do you use? Have you configured checkpoints (if it is one
that needs checkpoints to be configured - e.g. Flink)?
Jan
On 6/13/19 3:47 PM, Nicolas Delsaux wrote:
I'm having big troubles reading data from RabbitMQ.
To understand my troubles, i've simplified my previous
I'm having big troubles reading data from RabbitMQ.
To understand my troubles, i've simplified my previous code to the extreme :
Pipeline pipeline = Pipeline.create(options);
PCollection wat = (PCollection)
pipeline.apply("read_from_rabbit",
RabbitMqIO.read()
Hi,
I'm being puzzled by the way data is commited in pipelines.
I'm reading data from RabbitMQ and trying to write it to files using
TextIO and the following pipeline
pipeline.apply("read_from_rabbit",
RabbitMqIO.read()
.withUri(options.getRabbitMQ