[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r458202365 ## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import com.google.auto.service.AutoService; +import java.io.Serializable; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.schemas.io.SchemaIOProvider; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; + +/** + * An implementation of {@link SchemaIOProvider} for reading and writing parquet files with {@link + * ParquetIO}. + */ +@Internal +@AutoService(SchemaIOProvider.class) +public class ParquetSchemaCapableIOProvider implements SchemaIOProvider { Review comment: could you remove the "Capable" from all of these class names now that we've switched to `SchemaIOProvider`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r45076 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.service.AutoService; +import java.io.Serializable; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.schemas.io.SchemaIOProvider; +import org.apache.beam.sdk.schemas.transforms.Convert; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; + +/** + * An implementation of {@link SchemaIOProvider} for reading and writing Avro files with {@link + * AvroIO}. + */ +@Internal +@AutoService(SchemaIOProvider.class) +public class AvroSchemaCapableIOProvider implements SchemaIOProvider { + /** Returns an id that uniquely represents this IO. */ + @Override + public String identifier() { +return "avro"; + } + + /** + * Returns the expected schema of the configuration object. Note this is distinct from the schema + * of the data source itself. No configuration expected for Avro. + */ + @Override + public Schema configurationSchema() { +return Schema.builder().build(); + } + + /** + * Produce a SchemaIO given a String representing the data's location, the schema of the data that + * resides there, and some IO-specific configuration object. + */ + @Override + public AvroSchemaIO from(String location, Row configuration, Schema dataSchema) { +return new AvroSchemaIO(location, dataSchema); + } + + @Override + public boolean requiresDataSchema() { +return true; + } + + @Override + public PCollection.IsBounded isBounded() { +return PCollection.IsBounded.BOUNDED; + } + + /** An abstraction to create schema aware IOs. */ + @Internal + private static class AvroSchemaIO implements SchemaIO, Serializable { +protected final Schema dataSchema; +protected final String location; + +private AvroSchemaIO(String location, Schema dataSchema) { + this.dataSchema = dataSchema; + this.location = location; +} + +@Override +public Schema schema() { + return dataSchema; +} + +@Override +public PTransform> buildReader() { + return new PTransform>() { +@Override +public PCollection expand(PBegin begin) { + return begin + .apply( + "AvroIORead", + AvroIO.readGenericRecords(AvroUtils.toAvroSchema(dataSchema, null, null)) Review comment: I think it is fine, when the name of the schema isn't specified we just call it "topLevelRecord": https://github.com/apache/beam/blob/f36250f2816da2a0d8350e92f0010615f8491ee0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L327 I don't think the name has any effect on the way we encode/decode the Avro records. (CC @iemejia in case I'm wrong) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r452438068 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { + protected final SchemaIO schemaIO; -/** {@link ParquetTable} is a {@link BeamSqlTable}. */ -public class ParquetTable extends SchemaBaseBeamTable implements Serializable { - private final String filePattern; + private SchemaIOTableWrapper(SchemaIO schemaIO) { +this.schemaIO = schemaIO; + } - public ParquetTable(Schema beamSchema, String filePattern) { -super(beamSchema); -this.filePattern = filePattern; + static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) { +return new SchemaIOTableWrapper(schemaIO); } @Override - public PCollection buildIOReader(PBegin begin) { -PTransform, PCollection> readConverter = -GenericRecordReadConverter.builder().beamSchema(schema).build(); + public PCollection.IsBounded isBounded() { +return PCollection.IsBounded.UNBOUNDED; + } -return begin -.apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern)) -.apply("GenericRecordToRow", readConverter); + @Override + public Schema getSchema() { +return schemaIO.schema(); } @Override - public PDone buildIOWriter(PCollection input) { -throw new UnsupportedOperationException("Writing to a Parquet file is not supported"); + public PCollection buildIOReader(PBegin begin) { +PTransform> readerTransform = schemaIO.buildReader(); +return begin.apply(readerTransform); } @Override - public PCollection.IsBounded isBounded() { -return PCollection.IsBounded.BOUNDED; + public POutput buildIOWriter(PCollection input) { +PTransform, POutput> writerTransform = schemaIO.buildWriter(); +return input.apply(writerTransform); } @Override public BeamTableStatistics getTableStatistics(PipelineOptions options) { -return BeamTableStatistics.BOUNDED_UNKNOWN; +return BeamTableStatistics.UNBOUNDED_UNKNOWN; Review comment: Yeah we could just make it a property on SchemaCapableIOProvider for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r452432390 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; + +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidSchemaException; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.util.RowJson; +import org.apache.beam.sdk.values.Row; + +/** + * A general {@link TableProvider} for IOs for consumption by Beam SQL. + * + * Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to + * the IO. + */ +@Internal +@Experimental +@AutoService(TableProvider.class) +public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { Review comment: Ah whoops! Yeah I guess the best we can do is mark it `@Internal` then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r452432390 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; + +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidSchemaException; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.util.RowJson; +import org.apache.beam.sdk.values.Row; + +/** + * A general {@link TableProvider} for IOs for consumption by Beam SQL. + * + * Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to + * the IO. + */ +@Internal +@Experimental +@AutoService(TableProvider.class) +public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { Review comment: Ah whoops! Yeah I guess the best we can do is mark it @Internal then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r452329517 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { + protected final SchemaIO schemaIO; -/** {@link ParquetTable} is a {@link BeamSqlTable}. */ -public class ParquetTable extends SchemaBaseBeamTable implements Serializable { - private final String filePattern; + private SchemaIOTableWrapper(SchemaIO schemaIO) { +this.schemaIO = schemaIO; + } - public ParquetTable(Schema beamSchema, String filePattern) { -super(beamSchema); -this.filePattern = filePattern; + static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) { +return new SchemaIOTableWrapper(schemaIO); } @Override - public PCollection buildIOReader(PBegin begin) { -PTransform, PCollection> readConverter = -GenericRecordReadConverter.builder().beamSchema(schema).build(); + public PCollection.IsBounded isBounded() { +return PCollection.IsBounded.UNBOUNDED; + } -return begin -.apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern)) -.apply("GenericRecordToRow", readConverter); + @Override + public Schema getSchema() { +return schemaIO.schema(); } @Override - public PDone buildIOWriter(PCollection input) { -throw new UnsupportedOperationException("Writing to a Parquet file is not supported"); + public PCollection buildIOReader(PBegin begin) { +PTransform> readerTransform = schemaIO.buildReader(); +return begin.apply(readerTransform); } @Override - public PCollection.IsBounded isBounded() { -return PCollection.IsBounded.BOUNDED; + public POutput buildIOWriter(PCollection input) { +PTransform, POutput> writerTransform = schemaIO.buildWriter(); +return input.apply(writerTransform); } @Override public BeamTableStatistics getTableStatistics(PipelineOptions options) { -return BeamTableStatistics.BOUNDED_UNKNOWN; +return BeamTableStatistics.UNBOUNDED_UNKNOWN; Review comment: This is a little tricky though since the method is on BeamSqlTable, and not on TableProvider. One possible solution: make SchemaIOTableWrapper an inner (non-static) class of SchemaIOTableProviderWrapper. Then the method with the default logic can be on SchemaIOTableProviderWrapper, and it's sub-classes can override it if they need to. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r452329517 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { + protected final SchemaIO schemaIO; -/** {@link ParquetTable} is a {@link BeamSqlTable}. */ -public class ParquetTable extends SchemaBaseBeamTable implements Serializable { - private final String filePattern; + private SchemaIOTableWrapper(SchemaIO schemaIO) { +this.schemaIO = schemaIO; + } - public ParquetTable(Schema beamSchema, String filePattern) { -super(beamSchema); -this.filePattern = filePattern; + static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) { +return new SchemaIOTableWrapper(schemaIO); } @Override - public PCollection buildIOReader(PBegin begin) { -PTransform, PCollection> readConverter = -GenericRecordReadConverter.builder().beamSchema(schema).build(); + public PCollection.IsBounded isBounded() { +return PCollection.IsBounded.UNBOUNDED; + } -return begin -.apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern)) -.apply("GenericRecordToRow", readConverter); + @Override + public Schema getSchema() { +return schemaIO.schema(); } @Override - public PDone buildIOWriter(PCollection input) { -throw new UnsupportedOperationException("Writing to a Parquet file is not supported"); + public PCollection buildIOReader(PBegin begin) { +PTransform> readerTransform = schemaIO.buildReader(); +return begin.apply(readerTransform); } @Override - public PCollection.IsBounded isBounded() { -return PCollection.IsBounded.BOUNDED; + public POutput buildIOWriter(PCollection input) { +PTransform, POutput> writerTransform = schemaIO.buildWriter(); +return input.apply(writerTransform); } @Override public BeamTableStatistics getTableStatistics(PipelineOptions options) { -return BeamTableStatistics.BOUNDED_UNKNOWN; +return BeamTableStatistics.UNBOUNDED_UNKNOWN; Review comment: This is a little tricky though since the method is on BeamSqlTable, and not on TableProvider. One possible solution: make SchemaIOTableWrapper an inner (non-static) class of SchemaIOTableProviderWrapper. Then the method with the default logic can be on SchemaIOTableProviderWrapper, and the sub-classes will be able to override it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r452326180 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { + protected final SchemaIO schemaIO; -/** {@link ParquetTable} is a {@link BeamSqlTable}. */ -public class ParquetTable extends SchemaBaseBeamTable implements Serializable { - private final String filePattern; + private SchemaIOTableWrapper(SchemaIO schemaIO) { +this.schemaIO = schemaIO; + } - public ParquetTable(Schema beamSchema, String filePattern) { -super(beamSchema); -this.filePattern = filePattern; + static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) { +return new SchemaIOTableWrapper(schemaIO); } @Override - public PCollection buildIOReader(PBegin begin) { -PTransform, PCollection> readConverter = -GenericRecordReadConverter.builder().beamSchema(schema).build(); + public PCollection.IsBounded isBounded() { +return PCollection.IsBounded.UNBOUNDED; + } -return begin -.apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern)) -.apply("GenericRecordToRow", readConverter); + @Override + public Schema getSchema() { +return schemaIO.schema(); } @Override - public PDone buildIOWriter(PCollection input) { -throw new UnsupportedOperationException("Writing to a Parquet file is not supported"); + public PCollection buildIOReader(PBegin begin) { +PTransform> readerTransform = schemaIO.buildReader(); +return begin.apply(readerTransform); } @Override - public PCollection.IsBounded isBounded() { -return PCollection.IsBounded.BOUNDED; + public POutput buildIOWriter(PCollection input) { +PTransform, POutput> writerTransform = schemaIO.buildWriter(); +return input.apply(writerTransform); } @Override public BeamTableStatistics getTableStatistics(PipelineOptions options) { -return BeamTableStatistics.BOUNDED_UNKNOWN; +return BeamTableStatistics.UNBOUNDED_UNKNOWN; Review comment: Great question. We should definitely keep the `BeamTableStatistics` in the SQL extensions for now, since it only has meaning for SQL. The reason this exists is so we can do better when optimizing SQL queries - it's for "cost-based optimization" - here's a [design doc](https://docs.google.com/document/d/1DM_bcfFbIoc_vEoqQxhC7AvHBUDVCAwToC8TYGukkII/edit) about our implementation of it. The idea is we can be smarter with how we decide to perform a SQL query if we know how "big" each data source is (or how "fast" in the case of an unbounded source). In theory this could be useful information in core Beam as well but we're a long way from doing that kind of optimization there. I think what we should do is have a default implementation that returns either UNBOUNDED_UNKNOWN or BOUNDED_UNKNOWN based on what SchemaIO tells us it is (which will be sufficient for avro/parquet/pubsub), and then other table providers can override it if they need to. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r451872831 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; + +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidSchemaException; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.util.RowJson; +import org.apache.beam.sdk.values.Row; + +/** + * A general {@link TableProvider} for IOs for consumption by Beam SQL. + * + * Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to + * the IO. + */ +@Internal +@Experimental +@AutoService(TableProvider.class) +public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { Review comment: ```suggestion abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { ``` I think this AutoService annotation is what's causing the Java PreCommit to fail. The `AutoService` annotation makes it so that a call `ServiceLoader.load(TableProvider.class)` will try to instantiate this class if it's in the classpath, and it's not possible to instantiate this since its abstract. Specifically this is the ServiceLoader call that's biting you: https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86 I think we should also make this package-private This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers
TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r451889886 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { Review comment: ```suggestion class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { ``` I think this can be package-private. It might also make sense to make it an inner class of `SchemaIOTableProviderWrapper`, but I'll leave that up to you ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; + +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidSchemaException; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.util.RowJson; +import org.apache.beam.sdk.values.Row; + +/** + * A general {@link TableProvider} for IOs for consumption by Beam SQL. + * + * Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to + * the IO. + */ +@Internal +@Experimental +@AutoService(TableProvider.class) +public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { Review comment: ```suggestion abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { ``` I think this AutoService annotation is what's causing the Java PreCommit to fail. The `AutoService` annotation makes it so that a call `ServiceLoader.load(TableProvider.class)` will try to instantiate this class if it's in the classpath, and it's not possible Specifically this is the ServiceLoader call that's biting you: https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86 I think we should also make this package-private ## File path: sdks/java/core/sr