[ https://issues.apache.org/jira/browse/BEAM-10407?focusedWorklogId=456373&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-456373 ]
ASF GitHub Bot logged work on BEAM-10407: ----------------------------------------- Author: ASF GitHub Bot Created on: 09/Jul/20 00:44 Start Date: 09/Jul/20 00:44 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r451889886 ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ########## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { Review comment: ```suggestion class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { ``` I think this can be package-private. It might also make sense to make it an inner class of `SchemaIOTableProviderWrapper`, but I'll leave that up to you ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; + +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidSchemaException; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.util.RowJson; +import org.apache.beam.sdk.values.Row; + +/** + * A general {@link TableProvider} for IOs for consumption by Beam SQL. + * + * <p>Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to + * the IO. + */ +@Internal +@Experimental +@AutoService(TableProvider.class) +public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { Review comment: ```suggestion abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider { ``` I think this AutoService annotation is what's causing the Java PreCommit to fail. The `AutoService` annotation makes it so that a call `ServiceLoader.load(TableProvider.class)` will try to instantiate this class if it's in the classpath, and it's not possible Specifically this is the ServiceLoader call that's biting you: https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86 I think we should also make this package-private ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.service.AutoService; +import java.io.Serializable; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.schemas.transforms.Convert; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; + +/** + * An implementation of {@link SchemaCapableIOProvider} for reading and writing JSON payloads with + * {@link AvroIO}. Review comment: ```suggestion * An implementation of {@link SchemaCapableIOProvider} for reading and writing avro files with * {@link AvroIO}. ``` ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ########## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { + protected final SchemaIO schemaIO; -/** {@link ParquetTable} is a {@link BeamSqlTable}. */ -public class ParquetTable extends SchemaBaseBeamTable implements Serializable { - private final String filePattern; + private SchemaIOTableWrapper(SchemaIO schemaIO) { + this.schemaIO = schemaIO; + } - public ParquetTable(Schema beamSchema, String filePattern) { - super(beamSchema); - this.filePattern = filePattern; + static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) { + return new SchemaIOTableWrapper(schemaIO); } @Override - public PCollection<Row> buildIOReader(PBegin begin) { - PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter = - GenericRecordReadConverter.builder().beamSchema(schema).build(); + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.UNBOUNDED; + } Review comment: Hmm this is actually something that will need to be different for each IO. Parquet and Avro are both bounded data sources, while pubsub is unbounded. Can you add this to the SchemaIO interface and plumb it through here? ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java ########## @@ -15,54 +15,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.extensions.sql.meta.provider; import java.io.Serializable; -import org.apache.avro.generic.GenericRecord; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.io.SchemaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +@Internal +@Experimental +/** + * A generalized {@link Table} for IOs to create IO readers and writers. + */ +public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable { + protected final SchemaIO schemaIO; -/** {@link ParquetTable} is a {@link BeamSqlTable}. */ -public class ParquetTable extends SchemaBaseBeamTable implements Serializable { - private final String filePattern; + private SchemaIOTableWrapper(SchemaIO schemaIO) { + this.schemaIO = schemaIO; + } - public ParquetTable(Schema beamSchema, String filePattern) { - super(beamSchema); - this.filePattern = filePattern; + static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) { + return new SchemaIOTableWrapper(schemaIO); } @Override - public PCollection<Row> buildIOReader(PBegin begin) { - PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter = - GenericRecordReadConverter.builder().beamSchema(schema).build(); + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.UNBOUNDED; + } - return begin - .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern)) - .apply("GenericRecordToRow", readConverter); + @Override + public Schema getSchema() { + return schemaIO.schema(); } @Override - public PDone buildIOWriter(PCollection<Row> input) { - throw new UnsupportedOperationException("Writing to a Parquet file is not supported"); + public PCollection<Row> buildIOReader(PBegin begin) { + PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader(); + return begin.apply(readerTransform); } @Override - public PCollection.IsBounded isBounded() { - return PCollection.IsBounded.BOUNDED; + public POutput buildIOWriter(PCollection<Row> input) { + PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter(); + return input.apply(writerTransform); } @Override public BeamTableStatistics getTableStatistics(PipelineOptions options) { - return BeamTableStatistics.BOUNDED_UNKNOWN; + return BeamTableStatistics.UNBOUNDED_UNKNOWN; Review comment: This will also need to be different for avro/parquet vs. pubsub. It could just be determined from the same method on `SchemaIO` ########## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/GenericRecordReadConverter.java ########## @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +package org.apache.beam.sdk.io.parquet; import com.google.auto.value.AutoValue; import java.io.Serializable; Review comment: Can you make this class package-private? Users may be more tempted to use it now that it's outside of the SQL extensions. This could be a generally useful transform, we may want to move it outside of the parquet package and make it public... but let's not do that now. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java ########## @@ -38,15 +42,10 @@ * LOCATION '/tmp/persons.avro' * }</pre> */ -@AutoService(TableProvider.class) -public class AvroTableProvider extends InMemoryMetaTableProvider { - @Override - public String getTableType() { - return "avro"; - } - - @Override - public BeamSqlTable buildBeamSqlTable(Table table) { - return new AvroTable(table.getName(), table.getSchema(), table.getLocation()); +@Internal +@Experimental +public class AvroTableProvider extends SchemaCapableIOTableProviderWrapper { Review comment: ```suggestion @AutoService(TableProvider.class) public class AvroTableProvider extends SchemaCapableIOTableProviderWrapper { ``` We can keep these annotations the same as they were, since this class should work exactly the same as it used to from the user perspective. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java ########## @@ -38,15 +42,10 @@ * LOCATION '/home/admin/users.parquet' * }</pre> */ -@AutoService(TableProvider.class) -public class ParquetTableProvider extends InMemoryMetaTableProvider { - @Override - public String getTableType() { - return "parquet"; - } - - @Override - public BeamSqlTable buildBeamSqlTable(Table table) { - return new ParquetTable(table.getSchema(), table.getLocation()); +@Internal +@Experimental +public class ParquetTableProvider extends SchemaCapableIOTableProviderWrapper { Review comment: ```suggestion @AutoService(TableProvider.class) public class ParquetTableProvider extends SchemaCapableIOTableProviderWrapper { ``` Here as well ########## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import com.google.auto.service.AutoService; +import java.io.Serializable; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; + +/** + * An implementation of {@link SchemaCapableIOProvider} for reading and writing JSON payloads with + * {@link ParquetIO}. Review comment: ```suggestion * An implementation of {@link SchemaCapableIOProvider} for reading and writing parquet files with * {@link ParquetIO}. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 456373) Time Spent: 20m (was: 10m) > Move Avro and Parquet provider logic to core beam > ------------------------------------------------- > > Key: BEAM-10407 > URL: https://issues.apache.org/jira/browse/BEAM-10407 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core > Reporter: Scott Lukas > Assignee: Scott Lukas > Priority: P2 > Labels: schema-io > Time Spent: 20m > Remaining Estimate: 0h > > Implement SchemaIO and SchemaCapableIOProvider for Avro and Parquet. > Additional details: > [https://docs.google.com/document/d/1ic3P8EVGHIydHQ-VMDKbN9kEdwm7sBXMo80VrhwksvI/edit#heading=h.x9snb54sjlu9] > [~bhulette] -- This message was sent by Atlassian Jira (v8.3.4#803005)