Apoorv Gupta created BEAM-8913:
----------------------------------

             Summary: ParquetIO cannot read files written by itself using 
reflection
                 Key: BEAM-8913
                 URL: https://issues.apache.org/jira/browse/BEAM-8913
             Project: Beam
          Issue Type: Bug
          Components: io-java-parquet
    Affects Versions: 2.16.0, 2.15.0
         Environment: Java 8, JUnit 4
            Reporter: Apoorv Gupta


Apache Beam is unable to read Parquet files when they are written using a 
Schema generated by reflection. However, it is able to read Parquet files when 
they are written using a hardcoded Schema.
 
The following test passes right now. However, it fails when 'SCHEMA' is 
replaced with 'SCHEMA_FAILS' in this test.
 
package com.example;

import java.io.Serializable;
import java.lang.reflect.Field;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public final class ReflectionTest {
 @Rule public transient TestPipeline pipeline = TestPipeline.create();

@Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();

private static final Schema SCHEMA_FAILS = 
ReflectData.get().getSchema(Transaction.class);
 private static final Schema SCHEMA = new 
Schema.Parser().parse(Transaction.SCHEMA);

/**
 * This test creates GenericRecord objects, writes them to Parquet files and 
reads them back.
 *
 * <p>However, it is able to read Parquet files only when they are written 
using a hardcoded
 * Schema (see ReflectionTest.SCHEMA defined above).
 *
 * <p>It is unable to read Parquet files when they are written using a Schema 
generated by
 * reflection (see ReflectionTest.SCHEMA_FAILS defined above).
 */
 @Test
 public void genericRecordToTableRow_convertsGenericRecordToTableRow() {
 PCollection<GenericRecord> pgr =
 pipeline
 .apply(GenerateSequence.from(0).to(2))
 .apply("translateToGeneric", ParDo.of(new LongToGenericRecord()))
 .setCoder(AvroCoder.of(SCHEMA));

PCollection<GenericRecord> writeThenRead =
 pgr.apply(
 FileIO.<GenericRecord>write()
 .via(ParquetIO.sink(SCHEMA))
 .to(temporaryFolder.getRoot().getAbsolutePath()))
 .getPerDestinationOutputFilenames()
 .apply(Values.create())
 .apply(FileIO.matchAll())
 .apply(FileIO.readMatches())
 .apply(ParquetIO.readFiles(SCHEMA))
 .apply(Filter.by(x -> false));

PAssert.that(writeThenRead).empty();
 pipeline.run().waitUntilFinish();
 }

static class LongToGenericRecord extends DoFn<Long, GenericRecord> {
 @ProcessElement
 public void processElement(ProcessContext context) {
 Transaction tr = new Transaction(context.element());
 GenericRecord result = new GenericData.Record(SCHEMA);
 for (Schema.Field r : SCHEMA.getFields()) {
 String name = r.name();
 try {
 Field f = Transaction.class.getDeclaredField(name);
 f.setAccessible(true);
 result.put(name, f.get(tr));
 } catch (NoSuchFieldException nsfe) {
 throw new RuntimeException("no such field: " + name, nsfe);
 } catch (IllegalAccessException iae) {
 throw new RuntimeException("no access to field: " + name, iae);
 }
 }
 context.output(result);
 }
 }

/** represents a row in our generated data */
 public static final class Transaction implements Serializable {

double amountBase = 0.0;

public Transaction(double amt) {
 amountBase = amt;
 }

double getAmountBase() {
 return amountBase;
 }

public static final String SCHEMA =
 "{\n"
 + " \"namespace\": \"sample\",\n"
 + " \"type\": \"record\",\n"
 + " \"name\": \"Transaction\",\n"
 + " \"fields\": [\n"
 + " \{\"name\": \"amountBase\", \"type\": \"double\"}\n"
 + " ]\n"
 + "}";

public boolean equals(Object t) {
 if (t instanceof Transaction) {
 return ((Transaction) t).getAmountBase() == amountBase;
 } else {
 return false;
 }
 }

public int hashCode() {
 return (int) amountBase;
 }
 }
}
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to