I discovered that it works if I build the schema, and change the namespace.
In my example, the automatically-generated schema produces the namespace 
“pva.beam.Main$” and name “TP”.
Apparently, when reading, if Avro can find the Class from the schema’s 
namespace + name, it will instantiate that instead of using GenericRecord. Or 
is there a constraint on the sorts of objects that can be stored – do they have 
to implement GenericRecord? (If so, is this documented somewhere?)


From: Lukasz Cwik <[email protected]>
Sent: vendredi 24 août 2018 22:04
To: [email protected]
Subject: Re: Beam and ParquetIO

Does it work if you define the schema manually instead of using ReflectData?

All the Parquet tests[1] seem to explicitly build the schema and this could be 
a current limitation of the ParquetIO implementation. I would need someone with 
more Parquet knowledge related to the implementation though to speak up.

1: 
https://github.com/apache/beam/blob/88b3556ad99e16b3a63064d7243d1675df501ef5/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java#L97

On Thu, Aug 23, 2018 at 11:57 PM Kelsey RIDER 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

Sorry, missed that bit of code – the SchemaConverter does exactly the same 
thing, so they are indeed both created in the same way.

From: Lukasz Cwik <[email protected]<mailto:[email protected]>>
Sent: vendredi 24 août 2018 01:15
To: [email protected]<mailto:[email protected]>
Subject: Re: Beam and ParquetIO

I believe you need to be consistent in how you are creating your schema since
Schema tpSchema = ReflectData.get().getSchema(TP.class)   !=   new 
SchemaConverter().convert(TP.class);

Either use Schema tpSchema = ReflectData.get().getSchema(TP.class)  OR   new 
SchemaConverter().convert(TP.class);


On Thu, Aug 23, 2018 at 8:41 AM Kelsey RIDER 
<[email protected]<mailto:[email protected]>> wrote:
Hello,

I’m trying to use ParquetIO to both read and write records.
However, I’m unable to read back the records that were previously written.
I wrote a simple test:

private static class TP implements Serializable {
               public String str;
}

private void testWriteParquet(String[] args) {
               final PVAOptions options = getOptions(args);

               final Pipeline pipeline = Pipeline.create(options);

               Schema tpSchema = ReflectData.get().getSchema(TP.class);
               GenericData.Record record = new GenericData.Record(tpSchema);
               record.put("str", "All your data are belong to us");

               
pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class, 
AvroCoder.of(tpSchema));

               pipeline.apply(Create.of((GenericRecord)record))
                              
.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory()
 + "parquettest"));

               pipeline.run();
}

private void testReadParquet(String[] args) {
               final PVAOptions options = getOptions(args);

               final Pipeline pipeline = Pipeline.create(options);

               Schema tpSchema = new SchemaConverter().convert(TP.class);

               
pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory() 
+ "parquettest/*"))
                              .apply(ParDo.of(new DoFn<GenericRecord, Void>() {
                                            @ProcessElement
                                            public void pe(ProcessContext c, 
@Element GenericRecord record) {
                                                           
System.out.println("----------------------------------- found record str = " + 
record.get("str"));
                                            }
                              }));

               pipeline.run();
}


The result I get is:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in 
block 0 in file 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775<mailto:org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775>
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
 (DirectRunner.java:349)
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
 (DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
    at pva.beam.Main.testReadParquet (Main.java:67)
    at pva.beam.Main.main (Main.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 1 in block 0 in file 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775<mailto:org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775>
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue 
(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
    at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement 
(ParquetIO.java:226)
Caused by: java.lang.ClassCastException: pva.beam.Main$TP cannot be cast to 
org.apache.avro.generic.IndexedRecord
    at org.apache.avro.generic.GenericData.setField (GenericData.java:690)
    at org.apache.parquet.avro.AvroRecordConverter.set 
(AvroRecordConverter.java:393)
    at org.apache.parquet.avro.AvroRecordConverter$2.add 
(AvroRecordConverter.java:136)
    at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary 
(AvroConverters.java:62)
    at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue 
(ColumnReaderImpl.java:317)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter 
(ColumnReaderImpl.java:367)
    at org.apache.parquet.io.RecordReaderImplementation.read 
(RecordReaderImplementation.java:406)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue 
(InternalParquetRecordReader.java:226)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
    at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement 
(ParquetIO.java:226)
    at 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement
 (Unknown Source)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement
 (SimpleDoFnRunner.java:185)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement
 (SimpleDoFnRunner.java:149)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows
 (SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement 
(ParDoEvaluator.java:189)
    at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement
 (DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements 
(DirectTransformExecutor.java:161)
   at org.apache.beam.runners.direct.DirectTransformExecutor.run 
(DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)

How can I read back my records with Beam?
Suite à l’évolution des dispositifs de réglementation du travail, si vous 
recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, 
sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y répondre 
immédiatement.

Reply via email to