TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452329517



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = 
schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = 
schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This is a little tricky though since the method is on BeamSqlTable, and 
not on TableProvider. One possible solution: make SchemaIOTableWrapper an inner 
(non-static) class of SchemaIOTableProviderWrapper. Then the method with the 
default logic can be on SchemaIOTableProviderWrapper, and it's sub-classes can 
override it if they need to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to