Hello,
I'm trying to use ParquetIO to both read and write records.
However, I'm unable to read back the records that were previously written.
I wrote a simple test:
private static class TP implements Serializable {
public String str;
}
private void testWriteParquet(String[] args) {
final PVAOptions options = getOptions(args);
final Pipeline pipeline = Pipeline.create(options);
Schema tpSchema = ReflectData.get().getSchema(TP.class);
GenericData.Record record = new GenericData.Record(tpSchema);
record.put("str", "All your data are belong to us");
pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class,
AvroCoder.of(tpSchema));
pipeline.apply(Create.of((GenericRecord)record))
.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory()
+ "parquettest"));
pipeline.run();
}
private void testReadParquet(String[] args) {
final PVAOptions options = getOptions(args);
final Pipeline pipeline = Pipeline.create(options);
Schema tpSchema = new SchemaConverter().convert(TP.class);
pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory()
+ "parquettest/*"))
.apply(ParDo.of(new DoFn<GenericRecord, Void>() {
@ProcessElement
public void pe(ProcessContext c,
@Element GenericRecord record) {
System.out.println("----------------------------------- found record str = " +
record.get("str"));
}
}));
pipeline.run();
}
The result I get is:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException:
org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in
block 0 in file
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
(DirectRunner.java:349)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
(DirectRunner.java:319)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:210)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
at pva.beam.Main.testReadParquet (Main.java:67)
at pva.beam.Main.main (Main.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
at 1 in block 0 in file
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue
(InternalParquetRecordReader.java:251)
at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement
(ParquetIO.java:226)
Caused by: java.lang.ClassCastException: pva.beam.Main$TP cannot be cast to
org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.setField (GenericData.java:690)
at org.apache.parquet.avro.AvroRecordConverter.set
(AvroRecordConverter.java:393)
at org.apache.parquet.avro.AvroRecordConverter$2.add
(AvroRecordConverter.java:136)
at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary
(AvroConverters.java:62)
at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue
(ColumnReaderImpl.java:317)
at
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter
(ColumnReaderImpl.java:367)
at org.apache.parquet.io.RecordReaderImplementation.read
(RecordReaderImplementation.java:406)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue
(InternalParquetRecordReader.java:226)
at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement
(ParquetIO.java:226)
at
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement
(Unknown Source)
at
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement
(SimpleDoFnRunner.java:185)
at
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement
(SimpleDoFnRunner.java:149)
at
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows
(SimplePushbackSideInputDoFnRunner.java:78)
at org.apache.beam.runners.direct.ParDoEvaluator.processElement
(ParDoEvaluator.java:189)
at
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement
(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements
(DirectTransformExecutor.java:161)
at org.apache.beam.runners.direct.DirectTransformExecutor.run
(DirectTransformExecutor.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
How can I read back my records with Beam?
Suite ? l'?volution des dispositifs de r?glementation du travail, si vous
recevez ce mail avant 7h00, en soir?e, durant le week-end ou vos cong?s merci,
sauf cas d'urgence exceptionnelle, de ne pas le traiter ni d'y r?pondre
imm?diatement.