[ 
https://issues.apache.org/jira/browse/BEAM-10407?focusedWorklogId=456373&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-456373
 ]

ASF GitHub Bot logged work on BEAM-10407:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jul/20 00:44
            Start Date: 09/Jul/20 00:44
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r451889886



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {

Review comment:
       ```suggestion
   class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
   ```
   
   I think this can be package-private. It might also make sense to make it an 
inner class of `SchemaIOTableProviderWrapper`, but I'll leave that up to you

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link 
#schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {

Review comment:
       ```suggestion
   abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {
   ```
   I think this AutoService annotation is what's causing the Java PreCommit to 
fail. The `AutoService` annotation makes it so that a call 
`ServiceLoader.load(TableProvider.class)` will try to instantiate this class if 
it's in the classpath, and it's not possible  
   
   Specifically this is the ServiceLoader call that's biting you:
   
https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86
   
   I think we should also make this package-private

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaCapableIOProvider} for reading and 
writing JSON payloads with
+ * {@link AvroIO}.

Review comment:
       ```suggestion
    * An implementation of {@link SchemaCapableIOProvider} for reading and 
writing avro files with
    * {@link AvroIO}.
   ```

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }

Review comment:
       Hmm this is actually something that will need to be different for each 
IO. Parquet and Avro are both bounded data sources, while pubsub is unbounded.
   
   Can you add this to the SchemaIO interface and plumb it through here?

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = 
schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = 
schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This will also need to be different for avro/parquet vs. pubsub. It 
could just be determined from the same method on `SchemaIO`

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/GenericRecordReadConverter.java
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.io.parquet;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;

Review comment:
       Can you make this class package-private? Users may be more tempted to 
use it now that it's outside of the SQL extensions.
   
   This could be a generally useful transform, we may want to move it outside 
of the parquet package and make it public... but let's not do that now.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java
##########
@@ -38,15 +42,10 @@
  * LOCATION '/tmp/persons.avro'
  * }</pre>
  */
-@AutoService(TableProvider.class)
-public class AvroTableProvider extends InMemoryMetaTableProvider {
-  @Override
-  public String getTableType() {
-    return "avro";
-  }
-
-  @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new AvroTable(table.getName(), table.getSchema(), 
table.getLocation());
+@Internal
+@Experimental
+public class AvroTableProvider extends SchemaCapableIOTableProviderWrapper {

Review comment:
       ```suggestion
   @AutoService(TableProvider.class)
   public class AvroTableProvider extends SchemaCapableIOTableProviderWrapper {
   ```
   
   We can keep these annotations the same as they were, since this class should 
work exactly the same as it used to from the user perspective.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
##########
@@ -38,15 +42,10 @@
  * LOCATION '/home/admin/users.parquet'
  * }</pre>
  */
-@AutoService(TableProvider.class)
-public class ParquetTableProvider extends InMemoryMetaTableProvider {
-  @Override
-  public String getTableType() {
-    return "parquet";
-  }
-
-  @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new ParquetTable(table.getSchema(), table.getLocation());
+@Internal
+@Experimental
+public class ParquetTableProvider extends SchemaCapableIOTableProviderWrapper {

Review comment:
       ```suggestion
   @AutoService(TableProvider.class)
   public class ParquetTableProvider extends 
SchemaCapableIOTableProviderWrapper {
   ```
   Here as well

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaCapableIOProvider} for reading and 
writing JSON payloads with
+ * {@link ParquetIO}.

Review comment:
       ```suggestion
    * An implementation of {@link SchemaCapableIOProvider} for reading and 
writing parquet files with
    * {@link ParquetIO}.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 456373)
    Time Spent: 20m  (was: 10m)

> Move Avro and Parquet provider logic to core beam
> -------------------------------------------------
>
>                 Key: BEAM-10407
>                 URL: https://issues.apache.org/jira/browse/BEAM-10407
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Scott Lukas
>            Assignee: Scott Lukas
>            Priority: P2
>              Labels: schema-io
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Implement SchemaIO and SchemaCapableIOProvider for Avro and Parquet.
> Additional details:
> [https://docs.google.com/document/d/1ic3P8EVGHIydHQ-VMDKbN9kEdwm7sBXMo80VrhwksvI/edit#heading=h.x9snb54sjlu9]
> [~bhulette]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to