sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452308746



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = 
schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = 
schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This relies on BeamTableStatistics which is not in the core directory. I 
could use schemaIO.isBounded() and instantiate the appropriate 
BeamTableStatistics within SchemaIOTableWrapper based on that value, which 
would work for pubsub/avro/parquet. But looking ahead, the 
getTableStatistics(..) method of other IOs such as seqgen and kafka rely on 
other methods within BeamTableStatistics. Do you think I should make a 
conversion class to BeamTableStatistics within core in anticipation of these 
issues or go ahead with using scheamIO.isBounded() regardless?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to