Hello,

I'm trying to use ParquetIO to both read and write records.
However, I'm unable to read back the records that were previously written.
I wrote a simple test:

private static class TP implements Serializable {
               public String str;
}

private void testWriteParquet(String[] args) {
               final PVAOptions options = getOptions(args);

               final Pipeline pipeline = Pipeline.create(options);

               Schema tpSchema = ReflectData.get().getSchema(TP.class);
               GenericData.Record record = new GenericData.Record(tpSchema);
               record.put("str", "All your data are belong to us");

               
pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class, 
AvroCoder.of(tpSchema));

               pipeline.apply(Create.of((GenericRecord)record))
                              
.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory()
 + "parquettest"));

               pipeline.run();
}

private void testReadParquet(String[] args) {
               final PVAOptions options = getOptions(args);

               final Pipeline pipeline = Pipeline.create(options);

               Schema tpSchema = new SchemaConverter().convert(TP.class);

               
pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory() 
+ "parquettest/*"))
                              .apply(ParDo.of(new DoFn<GenericRecord, Void>() {
                                            @ProcessElement
                                            public void pe(ProcessContext c, 
@Element GenericRecord record) {
                                                           
System.out.println("----------------------------------- found record str = " + 
record.get("str"));
                                            }
                              }));

               pipeline.run();
}


The result I get is:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in 
block 0 in file 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
 (DirectRunner.java:349)
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
 (DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
    at pva.beam.Main.testReadParquet (Main.java:67)
    at pva.beam.Main.main (Main.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 1 in block 0 in file 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue 
(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
    at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement 
(ParquetIO.java:226)
Caused by: java.lang.ClassCastException: pva.beam.Main$TP cannot be cast to 
org.apache.avro.generic.IndexedRecord
    at org.apache.avro.generic.GenericData.setField (GenericData.java:690)
    at org.apache.parquet.avro.AvroRecordConverter.set 
(AvroRecordConverter.java:393)
    at org.apache.parquet.avro.AvroRecordConverter$2.add 
(AvroRecordConverter.java:136)
    at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary 
(AvroConverters.java:62)
    at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue 
(ColumnReaderImpl.java:317)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter 
(ColumnReaderImpl.java:367)
    at org.apache.parquet.io.RecordReaderImplementation.read 
(RecordReaderImplementation.java:406)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue 
(InternalParquetRecordReader.java:226)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
    at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement 
(ParquetIO.java:226)
    at 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement
 (Unknown Source)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement
 (SimpleDoFnRunner.java:185)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement
 (SimpleDoFnRunner.java:149)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows
 (SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement 
(ParDoEvaluator.java:189)
    at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement
 (DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements 
(DirectTransformExecutor.java:161)
   at org.apache.beam.runners.direct.DirectTransformExecutor.run 
(DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)

How can I read back my records with Beam?
Suite ? l'?volution des dispositifs de r?glementation du travail, si vous 
recevez ce mail avant 7h00, en soir?e, durant le week-end ou vos cong?s merci, 
sauf cas d'urgence exceptionnelle, de ne pas le traiter ni d'y r?pondre 
imm?diatement.

Reply via email to