Apoorv Gupta created BEAM-8913: ---------------------------------- Summary: ParquetIO cannot read files written by itself using reflection Key: BEAM-8913 URL: https://issues.apache.org/jira/browse/BEAM-8913 Project: Beam Issue Type: Bug Components: io-java-parquet Affects Versions: 2.16.0, 2.15.0 Environment: Java 8, JUnit 4 Reporter: Apoorv Gupta
Apache Beam is unable to read Parquet files when they are written using a Schema generated by reflection. However, it is able to read Parquet files when they are written using a hardcoded Schema. The following test passes right now. However, it fails when 'SCHEMA' is replaced with 'SCHEMA_FAILS' in this test. package com.example; import java.io.Serializable; import java.lang.reflect.Field; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.parquet.ParquetIO; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public final class ReflectionTest { @Rule public transient TestPipeline pipeline = TestPipeline.create(); @Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder(); private static final Schema SCHEMA_FAILS = ReflectData.get().getSchema(Transaction.class); private static final Schema SCHEMA = new Schema.Parser().parse(Transaction.SCHEMA); /** * This test creates GenericRecord objects, writes them to Parquet files and reads them back. * * <p>However, it is able to read Parquet files only when they are written using a hardcoded * Schema (see ReflectionTest.SCHEMA defined above). * * <p>It is unable to read Parquet files when they are written using a Schema generated by * reflection (see ReflectionTest.SCHEMA_FAILS defined above). */ @Test public void genericRecordToTableRow_convertsGenericRecordToTableRow() { PCollection<GenericRecord> pgr = pipeline .apply(GenerateSequence.from(0).to(2)) .apply("translateToGeneric", ParDo.of(new LongToGenericRecord())) .setCoder(AvroCoder.of(SCHEMA)); PCollection<GenericRecord> writeThenRead = pgr.apply( FileIO.<GenericRecord>write() .via(ParquetIO.sink(SCHEMA)) .to(temporaryFolder.getRoot().getAbsolutePath())) .getPerDestinationOutputFilenames() .apply(Values.create()) .apply(FileIO.matchAll()) .apply(FileIO.readMatches()) .apply(ParquetIO.readFiles(SCHEMA)) .apply(Filter.by(x -> false)); PAssert.that(writeThenRead).empty(); pipeline.run().waitUntilFinish(); } static class LongToGenericRecord extends DoFn<Long, GenericRecord> { @ProcessElement public void processElement(ProcessContext context) { Transaction tr = new Transaction(context.element()); GenericRecord result = new GenericData.Record(SCHEMA); for (Schema.Field r : SCHEMA.getFields()) { String name = r.name(); try { Field f = Transaction.class.getDeclaredField(name); f.setAccessible(true); result.put(name, f.get(tr)); } catch (NoSuchFieldException nsfe) { throw new RuntimeException("no such field: " + name, nsfe); } catch (IllegalAccessException iae) { throw new RuntimeException("no access to field: " + name, iae); } } context.output(result); } } /** represents a row in our generated data */ public static final class Transaction implements Serializable { double amountBase = 0.0; public Transaction(double amt) { amountBase = amt; } double getAmountBase() { return amountBase; } public static final String SCHEMA = "{\n" + " \"namespace\": \"sample\",\n" + " \"type\": \"record\",\n" + " \"name\": \"Transaction\",\n" + " \"fields\": [\n" + " \{\"name\": \"amountBase\", \"type\": \"double\"}\n" + " ]\n" + "}"; public boolean equals(Object t) { if (t instanceof Transaction) { return ((Transaction) t).getAmountBase() == amountBase; } else { return false; } } public int hashCode() { return (int) amountBase; } } } -- This message was sent by Atlassian Jira (v8.3.4#803005)