Does it work if you define the schema manually instead of using ReflectData?
All the Parquet tests[1] seem to explicitly build the schema and this could be a current limitation of the ParquetIO implementation. I would need someone with more Parquet knowledge related to the implementation though to speak up. 1: https://github.com/apache/beam/blob/88b3556ad99e16b3a63064d7243d1675df501ef5/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java#L97 On Thu, Aug 23, 2018 at 11:57 PM Kelsey RIDER <[email protected]> wrote: > Hi, > > > > Sorry, missed that bit of code – the SchemaConverter does exactly the same > thing, so they are indeed both created in the same way. > > > > *From:* Lukasz Cwik <[email protected]> > *Sent:* vendredi 24 août 2018 01:15 > *To:* [email protected] > *Subject:* Re: Beam and ParquetIO > > > > I believe you need to be consistent in how you are creating your schema > since > > Schema tpSchema = ReflectData.get().getSchema(TP.class) != new > SchemaConverter().convert(TP.class); > > > > Either use Schema tpSchema = ReflectData.get().getSchema(TP.class) OR > new SchemaConverter().convert(TP.class); > > > > > > On Thu, Aug 23, 2018 at 8:41 AM Kelsey RIDER < > [email protected]> wrote: > > Hello, > > > > I’m trying to use ParquetIO to both read and write records. > > However, I’m unable to read back the records that were previously written. > > I wrote a simple test: > > > > private static class TP implements Serializable { > > public String str; > > } > > > > private void testWriteParquet(String[] args) { > > final PVAOptions options = getOptions(args); > > > > final Pipeline pipeline = Pipeline.create(options); > > > > Schema tpSchema = ReflectData.get().getSchema(TP.class); > > GenericData.Record record = new > GenericData.Record(tpSchema); > > record.put("str", "All your data are belong to us"); > > > > > pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class, > AvroCoder.of(tpSchema)); > > > > pipeline.apply(Create.of((GenericRecord)record)) > > > .apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory() > + "parquettest")); > > > > pipeline.run(); > > } > > > > private void testReadParquet(String[] args) { > > final PVAOptions options = getOptions(args); > > > > final Pipeline pipeline = Pipeline.create(options); > > > > Schema tpSchema = new SchemaConverter().convert(TP.class); > > > > > pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory() > + "parquettest/*")) > > .apply(ParDo.of(new DoFn<GenericRecord, > Void>() { > > @ProcessElement > > public void pe(ProcessContext > c, @Element GenericRecord record) { > > > System.out.println("----------------------------------- found record str = > " + record.get("str")); > > } > > })); > > > > pipeline.run(); > > } > > > > > > The result I get is: > > > > java.lang.reflect.InvocationTargetException > > *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*) > > *at* sun.reflect.NativeMethodAccessorImpl.invoke ( > *NativeMethodAccessorImpl.java:62*) > > *at* sun.reflect.DelegatingMethodAccessorImpl.invoke ( > *DelegatingMethodAccessorImpl.java:43*) > > *at* java.lang.reflect.Method.invoke (*Method.java:498*) > > *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run ( > *ExecJavaMojo.java:294*) > > *at* java.lang.Thread.run (*Thread.java:748*) > > *Caused by*: org.apache.beam.sdk.Pipeline$PipelineExecutionException: > org.apache.parquet.io.ParquetDecodingException: > Can not read value at 1 in block 0 in file > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775 > > *at* > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish > (*DirectRunner.java:349*) > > *at* > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish > (*DirectRunner.java:319*) > > *at* org.apache.beam.runners.direct.DirectRunner.run ( > *DirectRunner.java:210*) > > *at* org.apache.beam.runners.direct.DirectRunner.run ( > *DirectRunner.java:66*) > > *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:311*) > > *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:297*) > > *at* pva.beam.Main.testReadParquet (*Main.java:67*) > > *at* pva.beam.Main.main (*Main.java:75*) > > *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*) > > *at* sun.reflect.NativeMethodAccessorImpl.invoke ( > *NativeMethodAccessorImpl.java:62*) > > *at* sun.reflect.DelegatingMethodAccessorImpl.invoke ( > *DelegatingMethodAccessorImpl.java:43*) > > *at* java.lang.reflect.Method.invoke (*Method.java:498*) > > *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run ( > *ExecJavaMojo.java:294*) > > *at* java.lang.Thread.run (*Thread.java:748*) > > *Caused by*: org.apache.parquet.io.ParquetDecodingException: Can not read > value at 1 in block 0 in file > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775 > > *at* > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue ( > *InternalParquetRecordReader.java:251*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:132*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:136*) > > *at* > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement ( > *ParquetIO.java:226*) > > *Caused by*: java.lang.ClassCastException: pva.beam.Main$TP cannot be > cast to org.apache.avro.generic.IndexedRecord > > *at* org.apache.avro.generic.GenericData.setField ( > *GenericData.java:690*) > > *at* org.apache.parquet.avro.AvroRecordConverter.set ( > *AvroRecordConverter.java:393*) > > *at* org.apache.parquet.avro.AvroRecordConverter$2.add ( > *AvroRecordConverter.java:136*) > > *at* org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary > (*AvroConverters.java:62*) > > *at* org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue ( > *ColumnReaderImpl.java:317*) > > *at* > org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter > (*ColumnReaderImpl.java:367*) > > *at* org.apache.parquet.io.RecordReaderImplementation.read ( > *RecordReaderImplementation.java:406*) > > *at* > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue ( > *InternalParquetRecordReader.java:226*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:132*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:136*) > > *at* > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement ( > *ParquetIO.java:226*) > > *at* > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement > (*Unknown Source*) > > *at* > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement > (*SimpleDoFnRunner.java:185*) > > *at* > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement > (*SimpleDoFnRunner.java:149*) > > *at* > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows > (*SimplePushbackSideInputDoFnRunner.java:78*) > > *at* org.apache.beam.runners.direct.ParDoEvaluator.processElement ( > *ParDoEvaluator.java:189*) > > *at* > org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement > (*DoFnLifecycleManagerRemovingTransformEvaluator.java:55*) > > *at* > org.apache.beam.runners.direct.DirectTransformExecutor.processElements ( > *DirectTransformExecutor.java:161*) > > *at* org.apache.beam.runners.direct.DirectTransformExecutor.run ( > *DirectTransformExecutor.java:125*) > > *at* java.util.concurrent.Executors$RunnableAdapter.call ( > *Executors.java:511*) > > *at* java.util.concurrent.FutureTask.run (*FutureTask.java:266*) > > *at* java.util.concurrent.ThreadPoolExecutor.runWorker ( > *ThreadPoolExecutor.java:1149*) > > *at* java.util.concurrent.ThreadPoolExecutor$Worker.run ( > *ThreadPoolExecutor.java:624*) > > *at* java.lang.Thread.run (*Thread.java:748*) > > > > How can I read back my records with Beam? > > *Suite à l’évolution des dispositifs de réglementation du travail, si vous > recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés > merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y > répondre immédiatement.* > >
