This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d79cd82 [BEAM-7929] Support column projection for Parquet Tables new 2447679 Merge pull request #14117: [BEAM-7929] Support column projection for Parquet Tables d79cd82 is described below commit d79cd82943c90dad518b705b7e81bcd2d2fc0f21 Author: Ismaël Mejía <ieme...@gmail.com> AuthorDate: Mon Mar 1 10:05:32 2021 +0100 [BEAM-7929] Support column projection for Parquet Tables --- sdks/java/extensions/sql/build.gradle | 1 + .../sql/meta/provider/parquet/ParquetTable.java | 132 +++++++++++++++++++++ .../provider/parquet/ParquetTableProvider.java | 22 ++-- .../provider/parquet/ParquetTableProviderTest.java | 35 +++++- .../sdk/io/parquet/ParquetSchemaIOProvider.java | 127 -------------------- 5 files changed, 172 insertions(+), 145 deletions(-) diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index 6de73f2..6758e4b 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -79,6 +79,7 @@ dependencies { provided project(":sdks:java:io:kafka") provided project(":sdks:java:io:google-cloud-platform") compile project(":sdks:java:io:mongodb") + compile library.java.avro provided project(":sdks:java:io:parquet") provided library.java.jackson_dataformat_xml provided library.java.hadoop_client diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java new file mode 100644 index 0000000..b2282ff --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter; +import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.io.parquet.ParquetIO.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.transforms.Convert; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +@SuppressWarnings({"nullness"}) +class ParquetTable extends SchemaBaseBeamTable implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(ParquetTable.class); + + private final Table table; + + ParquetTable(Table table) { + super(table.getSchema()); + this.table = table; + } + + @Override + public PCollection<Row> buildIOReader(PBegin begin) { + final Schema schema = AvroUtils.toAvroSchema(table.getSchema()); + Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*"); + return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows()); + } + + @Override + public PCollection<Row> buildIOReader( + PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) { + final Schema schema = AvroUtils.toAvroSchema(table.getSchema()); + Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*"); + if (!fieldNames.isEmpty()) { + Schema projectionSchema = projectSchema(schema, fieldNames); + LOG.info("Projecting fields schema : " + projectionSchema.toString()); + read = read.withProjection(projectionSchema, projectionSchema); + } + return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows()); + } + + /** Returns a copy of the {@link Schema} with only the fieldNames fields. */ + private static Schema projectSchema(Schema schema, List<String> fieldNames) { + List<Field> selectedFields = new ArrayList<>(); + for (String fieldName : fieldNames) { + selectedFields.add(deepCopyField(schema.getField(fieldName))); + } + return Schema.createRecord( + schema.getName() + "_projected", + schema.getDoc(), + schema.getNamespace(), + schema.isError(), + selectedFields); + } + + private static Field deepCopyField(Field field) { + Schema.Field newField = + new Schema.Field( + field.name(), field.schema(), field.doc(), field.defaultVal(), field.order()); + for (Map.Entry<String, Object> kv : field.getObjectProps().entrySet()) { + newField.addProp(kv.getKey(), kv.getValue()); + } + if (field.aliases() != null) { + for (String alias : field.aliases()) { + newField.addAlias(alias); + } + } + return newField; + } + + @Override + public POutput buildIOWriter(PCollection<Row> input) { + final org.apache.avro.Schema schema = AvroUtils.toAvroSchema(input.getSchema()); + return input + .apply("ToGenericRecords", Convert.to(GenericRecord.class)) + .apply( + "ParquetIOWrite", + FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to(table.getLocation())); + } + + @Override + public IsBounded isBounded() { + return PCollection.IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } + + @Override + public ProjectSupport supportsProjects() { + return ProjectSupport.WITH_FIELD_REORDERING; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java index b8a55f5..f24e226 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java @@ -18,18 +18,15 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; import com.google.auto.service.AutoService; -import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.io.parquet.ParquetIO; -import org.apache.beam.sdk.io.parquet.ParquetSchemaIOProvider; -import org.apache.beam.sdk.schemas.io.SchemaIOProvider; /** * {@link TableProvider} for {@link ParquetIO} for consumption by Beam SQL. * - * <p>Passes the {@link ParquetSchemaIOProvider} to the generalized table provider wrapper, {@link - * SchemaIOTableProviderWrapper}, for Parquet specific behavior. - * * <p>A sample of parquet table is: * * <pre>{@code @@ -39,19 +36,18 @@ import org.apache.beam.sdk.schemas.io.SchemaIOProvider; * favorite_numbers ARRAY<INTEGER> * ) * TYPE 'parquet' - * LOCATION '/home/admin/users.parquet' + * LOCATION '/home/admin/orders/' * }</pre> */ @AutoService(TableProvider.class) -public class ParquetTableProvider extends SchemaIOTableProviderWrapper { +public class ParquetTableProvider extends InMemoryMetaTableProvider { @Override - public SchemaIOProvider getSchemaIOProvider() { - return new ParquetSchemaIOProvider(); + public String getTableType() { + return "parquet"; } - // TODO[BEAM-10516]: remove this override after TableProvider problem is fixed @Override - public String getTableType() { - return "parquet"; + public BeamSqlTable buildBeamSqlTable(Table table) { + return new ParquetTable(table); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java index 49d7b6e..71680f7 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java @@ -44,7 +44,13 @@ public class ParquetTableProviderTest { private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT, country VARCHAR)"; - private static final Schema OUTPUT_ROW_SCHEMA = + private static final Schema TABLE_SCHEMA = + Schema.builder() + .addStringField("name") + .addInt64Field("age") + .addStringField("country") + .build(); + private static final Schema PROJECTED_SCHEMA = Schema.builder().addInt64Field("age").addStringField("country").build(); @Test @@ -61,15 +67,34 @@ public class ParquetTableProviderTest { writePipeline, env.parseQuery( "INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')")); - writePipeline.run().waitUntilFinish(); PCollection<Row> rows = + BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM PersonInfo")); + PAssert.that(rows) + .containsInAnyOrder( + Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build(), + Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build()); + + PCollection<Row> filtered = BeamSqlRelUtils.toPCollection( - readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo WHERE age > 25")); + readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age > 25")); + PAssert.that(filtered) + .containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build()); - PAssert.that(rows) - .containsInAnyOrder(Row.withSchema(OUTPUT_ROW_SCHEMA).addValues(42L, "USA").build()); + PCollection<Row> projected = + BeamSqlRelUtils.toPCollection( + readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo")); + PAssert.that(projected) + .containsInAnyOrder( + Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(), + Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build()); + + PCollection<Row> filteredAndProjected = + BeamSqlRelUtils.toPCollection( + readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo WHERE age > 25")); + PAssert.that(filteredAndProjected) + .containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build()); PipelineResult.State state = readPipeline.run().waitUntilFinish(); assertEquals(State.DONE, state); diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java deleted file mode 100644 index 71ef5e2..0000000 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.parquet; - -import com.google.auto.service.AutoService; -import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.io.FileIO; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.io.SchemaIO; -import org.apache.beam.sdk.schemas.io.SchemaIOProvider; -import org.apache.beam.sdk.schemas.transforms.Convert; -import org.apache.beam.sdk.schemas.utils.AvroUtils; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.Row; - -/** - * An implementation of {@link SchemaIOProvider} for reading and writing parquet files with {@link - * ParquetIO}. - */ -@Internal -@AutoService(SchemaIOProvider.class) -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -public class ParquetSchemaIOProvider implements SchemaIOProvider { - /** Returns an id that uniquely represents this IO. */ - @Override - public String identifier() { - return "parquet"; - } - - /** - * Returns the expected schema of the configuration object. Note this is distinct from the schema - * of the data source itself. No configuration expected for parquet. - */ - @Override - public Schema configurationSchema() { - return Schema.builder().build(); - } - - /** - * Produce a SchemaIO given a String representing the data's location, the schema of the data that - * resides there, and some IO-specific configuration object. - */ - @Override - public ParquetSchemaIO from(String location, Row configuration, Schema dataSchema) { - return new ParquetSchemaIO(location, dataSchema); - } - - @Override - public boolean requiresDataSchema() { - return true; - } - - @Override - public PCollection.IsBounded isBounded() { - return PCollection.IsBounded.BOUNDED; - } - - /** An abstraction to create schema aware IOs. */ - private static class ParquetSchemaIO implements SchemaIO, Serializable { - protected final Schema dataSchema; - protected final String location; - - private ParquetSchemaIO(String location, Schema dataSchema) { - this.dataSchema = dataSchema; - this.location = location; - } - - @Override - public Schema schema() { - return dataSchema; - } - - @Override - public PTransform<PBegin, PCollection<Row>> buildReader() { - return new PTransform<PBegin, PCollection<Row>>() { - @Override - public PCollection<Row> expand(PBegin begin) { - org.apache.avro.Schema schema = AvroUtils.toAvroSchema(dataSchema); - return begin - .apply( - "ParquetIORead", - ParquetIO.read(schema).withBeamSchemas(true).from(location + "/*")) - .apply("ToRows", Convert.toRows()); - } - }; - } - - @Override - public PTransform<PCollection<Row>, POutput> buildWriter() { - return new PTransform<PCollection<Row>, POutput>() { - @Override - public PDone expand(PCollection<Row> input) { - final org.apache.avro.Schema schema = AvroUtils.toAvroSchema(input.getSchema()); - input - .apply("ToGenericRecords", Convert.to(GenericRecord.class)) - .apply( - "ParquetIOWrite", - FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to(location)); - return PDone.in(input.getPipeline()); - } - }; - } - } -}