I believe you need to be consistent in how you are creating your schema since
Schema tpSchema = ReflectData.get().getSchema(TP.class) != new SchemaConverter().convert(TP.class); Either use Schema tpSchema = ReflectData.get().getSchema(TP.class) OR new SchemaConverter().convert(TP.class); On Thu, Aug 23, 2018 at 8:41 AM Kelsey RIDER <[email protected]> wrote: > Hello, > > > > I’m trying to use ParquetIO to both read and write records. > > However, I’m unable to read back the records that were previously written. > > I wrote a simple test: > > > > private static class TP implements Serializable { > > public String str; > > } > > > > private void testWriteParquet(String[] args) { > > final PVAOptions options = getOptions(args); > > > > final Pipeline pipeline = Pipeline.create(options); > > > > Schema tpSchema = ReflectData.get().getSchema(TP.class); > > GenericData.Record record = new > GenericData.Record(tpSchema); > > record.put("str", "All your data are belong to us"); > > > > > pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class, > AvroCoder.of(tpSchema)); > > > > pipeline.apply(Create.of((GenericRecord)record)) > > > .apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory() > + "parquettest")); > > > > pipeline.run(); > > } > > > > private void testReadParquet(String[] args) { > > final PVAOptions options = getOptions(args); > > > > final Pipeline pipeline = Pipeline.create(options); > > > > Schema tpSchema = new SchemaConverter().convert(TP.class); > > > > > pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory() > + "parquettest/*")) > > .apply(ParDo.of(new DoFn<GenericRecord, > Void>() { > > @ProcessElement > > public void pe(ProcessContext > c, @Element GenericRecord record) { > > > System.out.println("----------------------------------- found record str = > " + record.get("str")); > > } > > })); > > > > pipeline.run(); > > } > > > > > > The result I get is: > > > > java.lang.reflect.InvocationTargetException > > *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*) > > *at* sun.reflect.NativeMethodAccessorImpl.invoke ( > *NativeMethodAccessorImpl.java:62*) > > *at* sun.reflect.DelegatingMethodAccessorImpl.invoke ( > *DelegatingMethodAccessorImpl.java:43*) > > *at* java.lang.reflect.Method.invoke (*Method.java:498*) > > *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run ( > *ExecJavaMojo.java:294*) > > *at* java.lang.Thread.run (*Thread.java:748*) > > *Caused by*: org.apache.beam.sdk.Pipeline$PipelineExecutionException: > org.apache.parquet.io.ParquetDecodingException: > Can not read value at 1 in block 0 in file > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775 > > *at* > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish > (*DirectRunner.java:349*) > > *at* > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish > (*DirectRunner.java:319*) > > *at* org.apache.beam.runners.direct.DirectRunner.run ( > *DirectRunner.java:210*) > > *at* org.apache.beam.runners.direct.DirectRunner.run ( > *DirectRunner.java:66*) > > *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:311*) > > *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:297*) > > *at* pva.beam.Main.testReadParquet (*Main.java:67*) > > *at* pva.beam.Main.main (*Main.java:75*) > > *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*) > > *at* sun.reflect.NativeMethodAccessorImpl.invoke ( > *NativeMethodAccessorImpl.java:62*) > > *at* sun.reflect.DelegatingMethodAccessorImpl.invoke ( > *DelegatingMethodAccessorImpl.java:43*) > > *at* java.lang.reflect.Method.invoke (*Method.java:498*) > > *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run ( > *ExecJavaMojo.java:294*) > > *at* java.lang.Thread.run (*Thread.java:748*) > > *Caused by*: org.apache.parquet.io.ParquetDecodingException: Can not read > value at 1 in block 0 in file > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775 > > *at* > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue ( > *InternalParquetRecordReader.java:251*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:132*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:136*) > > *at* > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement ( > *ParquetIO.java:226*) > > *Caused by*: java.lang.ClassCastException: pva.beam.Main$TP cannot be > cast to org.apache.avro.generic.IndexedRecord > > *at* org.apache.avro.generic.GenericData.setField ( > *GenericData.java:690*) > > *at* org.apache.parquet.avro.AvroRecordConverter.set ( > *AvroRecordConverter.java:393*) > > *at* org.apache.parquet.avro.AvroRecordConverter$2.add ( > *AvroRecordConverter.java:136*) > > *at* org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary > (*AvroConverters.java:62*) > > *at* org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue ( > *ColumnReaderImpl.java:317*) > > *at* > org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter > (*ColumnReaderImpl.java:367*) > > *at* org.apache.parquet.io.RecordReaderImplementation.read ( > *RecordReaderImplementation.java:406*) > > *at* > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue ( > *InternalParquetRecordReader.java:226*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:132*) > > *at* org.apache.parquet.hadoop.ParquetReader.read ( > *ParquetReader.java:136*) > > *at* > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement ( > *ParquetIO.java:226*) > > *at* > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement > (*Unknown Source*) > > *at* > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement > (*SimpleDoFnRunner.java:185*) > > *at* > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement > (*SimpleDoFnRunner.java:149*) > > *at* > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows > (*SimplePushbackSideInputDoFnRunner.java:78*) > > *at* org.apache.beam.runners.direct.ParDoEvaluator.processElement ( > *ParDoEvaluator.java:189*) > > *at* > org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement > (*DoFnLifecycleManagerRemovingTransformEvaluator.java:55*) > > *at* > org.apache.beam.runners.direct.DirectTransformExecutor.processElements ( > *DirectTransformExecutor.java:161*) > > *at* org.apache.beam.runners.direct.DirectTransformExecutor.run ( > *DirectTransformExecutor.java:125*) > > *at* java.util.concurrent.Executors$RunnableAdapter.call ( > *Executors.java:511*) > > *at* java.util.concurrent.FutureTask.run (*FutureTask.java:266*) > > *at* java.util.concurrent.ThreadPoolExecutor.runWorker ( > *ThreadPoolExecutor.java:1149*) > > *at* java.util.concurrent.ThreadPoolExecutor$Worker.run ( > *ThreadPoolExecutor.java:624*) > > *at* java.lang.Thread.run (*Thread.java:748*) > > > > How can I read back my records with Beam? > Suite à l’évolution des dispositifs de réglementation du travail, si vous > recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés > merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y > répondre immédiatement. >
