Hi, Sorry, missed that bit of code – the SchemaConverter does exactly the same thing, so they are indeed both created in the same way.
From: Lukasz Cwik <[email protected]> Sent: vendredi 24 août 2018 01:15 To: [email protected] Subject: Re: Beam and ParquetIO I believe you need to be consistent in how you are creating your schema since Schema tpSchema = ReflectData.get().getSchema(TP.class) != new SchemaConverter().convert(TP.class); Either use Schema tpSchema = ReflectData.get().getSchema(TP.class) OR new SchemaConverter().convert(TP.class); On Thu, Aug 23, 2018 at 8:41 AM Kelsey RIDER <[email protected]<mailto:[email protected]>> wrote: Hello, I’m trying to use ParquetIO to both read and write records. However, I’m unable to read back the records that were previously written. I wrote a simple test: private static class TP implements Serializable { public String str; } private void testWriteParquet(String[] args) { final PVAOptions options = getOptions(args); final Pipeline pipeline = Pipeline.create(options); Schema tpSchema = ReflectData.get().getSchema(TP.class); GenericData.Record record = new GenericData.Record(tpSchema); record.put("str", "All your data are belong to us"); pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class, AvroCoder.of(tpSchema)); pipeline.apply(Create.of((GenericRecord)record)) .apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory() + "parquettest")); pipeline.run(); } private void testReadParquet(String[] args) { final PVAOptions options = getOptions(args); final Pipeline pipeline = Pipeline.create(options); Schema tpSchema = new SchemaConverter().convert(TP.class); pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory() + "parquettest/*")) .apply(ParDo.of(new DoFn<GenericRecord, Void>() { @ProcessElement public void pe(ProcessContext c, @Element GenericRecord record) { System.out.println("----------------------------------- found record str = " + record.get("str")); } })); pipeline.run(); } The result I get is: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294) at java.lang.Thread.run (Thread.java:748) Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775<mailto:org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:349) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:319) at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:210) at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:66) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297) at pva.beam.Main.testReadParquet (Main.java:67) at pva.beam.Main.main (Main.java:75) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294) at java.lang.Thread.run (Thread.java:748) Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775<mailto:org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775> at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (InternalParquetRecordReader.java:251) at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132) at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136) at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (ParquetIO.java:226) Caused by: java.lang.ClassCastException: pva.beam.Main$TP cannot be cast to org.apache.avro.generic.IndexedRecord at org.apache.avro.generic.GenericData.setField (GenericData.java:690) at org.apache.parquet.avro.AvroRecordConverter.set (AvroRecordConverter.java:393) at org.apache.parquet.avro.AvroRecordConverter$2.add (AvroRecordConverter.java:136) at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary (AvroConverters.java:62) at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue (ColumnReaderImpl.java:317) at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter (ColumnReaderImpl.java:367) at org.apache.parquet.io.RecordReaderImplementation.read (RecordReaderImplementation.java:406) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (InternalParquetRecordReader.java:226) at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132) at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136) at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (ParquetIO.java:226) at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement (Unknown Source) at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:185) at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement (SimpleDoFnRunner.java:149) at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows (SimplePushbackSideInputDoFnRunner.java:78) at org.apache.beam.runners.direct.ParDoEvaluator.processElement (ParDoEvaluator.java:189) at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement (DoFnLifecycleManagerRemovingTransformEvaluator.java:55) at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:161) at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:125) at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) at java.util.concurrent.FutureTask.run (FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624) at java.lang.Thread.run (Thread.java:748) How can I read back my records with Beam? Suite à l’évolution des dispositifs de réglementation du travail, si vous recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y répondre immédiatement.
