I believe you need to be consistent in how you are creating your schema
since

Schema tpSchema = ReflectData.get().getSchema(TP.class)   !=   new
SchemaConverter().convert(TP.class);


Either use Schema tpSchema = ReflectData.get().getSchema(TP.class)  OR
 new SchemaConverter().convert(TP.class);



On Thu, Aug 23, 2018 at 8:41 AM Kelsey RIDER <[email protected]>
wrote:

> Hello,
>
>
>
> I’m trying to use ParquetIO to both read and write records.
>
> However, I’m unable to read back the records that were previously written.
>
> I wrote a simple test:
>
>
>
> private static class TP implements Serializable {
>
>                public String str;
>
> }
>
>
>
> private void testWriteParquet(String[] args) {
>
>                final PVAOptions options = getOptions(args);
>
>
>
>                final Pipeline pipeline = Pipeline.create(options);
>
>
>
>                Schema tpSchema = ReflectData.get().getSchema(TP.class);
>
>                GenericData.Record record = new
> GenericData.Record(tpSchema);
>
>                record.put("str", "All your data are belong to us");
>
>
>
>
> pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class,
> AvroCoder.of(tpSchema));
>
>
>
>                pipeline.apply(Create.of((GenericRecord)record))
>
>
> .apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory()
> + "parquettest"));
>
>
>
>                pipeline.run();
>
> }
>
>
>
> private void testReadParquet(String[] args) {
>
>                final PVAOptions options = getOptions(args);
>
>
>
>                final Pipeline pipeline = Pipeline.create(options);
>
>
>
>                Schema tpSchema = new SchemaConverter().convert(TP.class);
>
>
>
>
> pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory()
> + "parquettest/*"))
>
>                               .apply(ParDo.of(new DoFn<GenericRecord,
> Void>() {
>
>                                             @ProcessElement
>
>                                             public void pe(ProcessContext
> c, @Element GenericRecord record) {
>
>
> System.out.println("----------------------------------- found record str =
> " + record.get("str"));
>
>                                             }
>
>                               }));
>
>
>
>                pipeline.run();
>
> }
>
>
>
>
>
> The result I get is:
>
>
>
> java.lang.reflect.InvocationTargetException
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*)
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke (
> *NativeMethodAccessorImpl.java:62*)
>
>     *at* sun.reflect.DelegatingMethodAccessorImpl.invoke (
> *DelegatingMethodAccessorImpl.java:43*)
>
>     *at* java.lang.reflect.Method.invoke (*Method.java:498*)
>
>     *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run (
> *ExecJavaMojo.java:294*)
>
>     *at* java.lang.Thread.run (*Thread.java:748*)
>
> *Caused by*: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> org.apache.parquet.io.ParquetDecodingException:
> Can not read value at 1 in block 0 in file
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
>
>     *at*
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (*DirectRunner.java:349*)
>
>     *at*
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (*DirectRunner.java:319*)
>
>     *at* org.apache.beam.runners.direct.DirectRunner.run (
> *DirectRunner.java:210*)
>
>     *at* org.apache.beam.runners.direct.DirectRunner.run (
> *DirectRunner.java:66*)
>
>     *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:311*)
>
>     *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:297*)
>
>     *at* pva.beam.Main.testReadParquet (*Main.java:67*)
>
>     *at* pva.beam.Main.main (*Main.java:75*)
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*)
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke (
> *NativeMethodAccessorImpl.java:62*)
>
>     *at* sun.reflect.DelegatingMethodAccessorImpl.invoke (
> *DelegatingMethodAccessorImpl.java:43*)
>
>     *at* java.lang.reflect.Method.invoke (*Method.java:498*)
>
>     *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run (
> *ExecJavaMojo.java:294*)
>
>     *at* java.lang.Thread.run (*Thread.java:748*)
>
> *Caused by*: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 1 in block 0 in file
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
>
>     *at*
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (
> *InternalParquetRecordReader.java:251*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:132*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:136*)
>
>     *at*
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (
> *ParquetIO.java:226*)
>
> *Caused by*: java.lang.ClassCastException: pva.beam.Main$TP cannot be
> cast to org.apache.avro.generic.IndexedRecord
>
>     *at* org.apache.avro.generic.GenericData.setField (
> *GenericData.java:690*)
>
>     *at* org.apache.parquet.avro.AvroRecordConverter.set (
> *AvroRecordConverter.java:393*)
>
>     *at* org.apache.parquet.avro.AvroRecordConverter$2.add (
> *AvroRecordConverter.java:136*)
>
>     *at* org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary
> (*AvroConverters.java:62*)
>
>     *at* org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue (
> *ColumnReaderImpl.java:317*)
>
>     *at*
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter
> (*ColumnReaderImpl.java:367*)
>
>     *at* org.apache.parquet.io.RecordReaderImplementation.read (
> *RecordReaderImplementation.java:406*)
>
>     *at*
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (
> *InternalParquetRecordReader.java:226*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:132*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:136*)
>
>     *at*
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (
> *ParquetIO.java:226*)
>
>     *at*
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement
> (*Unknown Source*)
>
>     *at*
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement
> (*SimpleDoFnRunner.java:185*)
>
>     *at*
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement
> (*SimpleDoFnRunner.java:149*)
>
>     *at*
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows
> (*SimplePushbackSideInputDoFnRunner.java:78*)
>
>     *at* org.apache.beam.runners.direct.ParDoEvaluator.processElement (
> *ParDoEvaluator.java:189*)
>
>     *at*
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement
> (*DoFnLifecycleManagerRemovingTransformEvaluator.java:55*)
>
>     *at*
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements (
> *DirectTransformExecutor.java:161*)
>
>    *at* org.apache.beam.runners.direct.DirectTransformExecutor.run (
> *DirectTransformExecutor.java:125*)
>
>     *at* java.util.concurrent.Executors$RunnableAdapter.call (
> *Executors.java:511*)
>
>     *at* java.util.concurrent.FutureTask.run (*FutureTask.java:266*)
>
>     *at* java.util.concurrent.ThreadPoolExecutor.runWorker (
> *ThreadPoolExecutor.java:1149*)
>
>     *at* java.util.concurrent.ThreadPoolExecutor$Worker.run (
> *ThreadPoolExecutor.java:624*)
>
>     *at* java.lang.Thread.run (*Thread.java:748*)
>
>
>
> How can I read back my records with Beam?
> Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.
>

Reply via email to