TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452326180



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = 
schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = 
schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       Great question. We should definitely keep the `BeamTableStatistics` in 
the SQL extensions for now, since it only has meaning for SQL.
   
   The reason this exists is so we can do better when optimizing SQL queries - 
it's for "cost-based optimization" - here's a [design 
doc](https://docs.google.com/document/d/1DM_bcfFbIoc_vEoqQxhC7AvHBUDVCAwToC8TYGukkII/edit)
 about our implementation of it. The idea is we can be smarter with how we 
decide to perform a SQL query if we know how "big"  each data source is (or how 
"fast" in the case of an unbounded source). In theory this could be useful 
information in core Beam as well but we're a long way from doing that kind of 
optimization there.
   
   I think what we should do is have a default implementation that returns 
either UNBOUNDED_UNKNOWN or BOUNDED_UNKNOWN based on what SchemaIO tells us it 
is (which will be sufficient for avro/parquet/pubsub), and then other table 
providers can override it if they need to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to