[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-21 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r458202365



##
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing 
parquet files with {@link
+ * ParquetIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class ParquetSchemaCapableIOProvider implements SchemaIOProvider {

Review comment:
   could you remove the "Capable" from all of these class names now that 
we've switched to `SchemaIOProvider`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-20 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r45076



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing Avro 
files with {@link
+ * AvroIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class AvroSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+return "avro";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
+   * of the data source itself. No configuration expected for Avro.
+   */
+  @Override
+  public Schema configurationSchema() {
+return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the 
schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public AvroSchemaIO from(String location, Row configuration, Schema 
dataSchema) {
+return new AvroSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal
+  private static class AvroSchemaIO implements SchemaIO, Serializable {
+protected final Schema dataSchema;
+protected final String location;
+
+private AvroSchemaIO(String location, Schema dataSchema) {
+  this.dataSchema = dataSchema;
+  this.location = location;
+}
+
+@Override
+public Schema schema() {
+  return dataSchema;
+}
+
+@Override
+public PTransform> buildReader() {
+  return new PTransform>() {
+@Override
+public PCollection expand(PBegin begin) {
+  return begin
+  .apply(
+  "AvroIORead",
+  AvroIO.readGenericRecords(AvroUtils.toAvroSchema(dataSchema, 
null, null))

Review comment:
   I think it is fine, when the name of the schema isn't specified we just 
call it "topLevelRecord": 
https://github.com/apache/beam/blob/f36250f2816da2a0d8350e92f0010615f8491ee0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L327
   
   I don't think the name has any effect on the way we encode/decode the Avro 
records. (CC @iemejia in case I'm wrong)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-09 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452438068



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-super(beamSchema);
-this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection buildIOReader(PBegin begin) {
-PTransform, PCollection> readConverter =
-GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+return PCollection.IsBounded.UNBOUNDED;
+  }
 
-return begin
-.apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-.apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection input) {
-throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection buildIOReader(PBegin begin) {
+PTransform> readerTransform = 
schemaIO.buildReader();
+return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection input) {
+PTransform, POutput> writerTransform = 
schemaIO.buildWriter();
+return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.BOUNDED_UNKNOWN;
+return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
   Yeah we could just make it a property on SchemaCapableIOProvider for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-09 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452432390



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * Can create child classes for IOs to pass {@link 
#schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {

Review comment:
   Ah whoops! Yeah I guess the best we can do is mark it `@Internal` then.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-09 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452432390



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * Can create child classes for IOs to pass {@link 
#schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {

Review comment:
   Ah whoops! Yeah I guess the best we can do is mark it @Internal then.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-09 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452329517



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-super(beamSchema);
-this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection buildIOReader(PBegin begin) {
-PTransform, PCollection> readConverter =
-GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+return PCollection.IsBounded.UNBOUNDED;
+  }
 
-return begin
-.apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-.apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection input) {
-throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection buildIOReader(PBegin begin) {
+PTransform> readerTransform = 
schemaIO.buildReader();
+return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection input) {
+PTransform, POutput> writerTransform = 
schemaIO.buildWriter();
+return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.BOUNDED_UNKNOWN;
+return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
   This is a little tricky though since the method is on BeamSqlTable, and 
not on TableProvider. One possible solution: make SchemaIOTableWrapper an inner 
(non-static) class of SchemaIOTableProviderWrapper. Then the method with the 
default logic can be on SchemaIOTableProviderWrapper, and it's sub-classes can 
override it if they need to.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-09 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452329517



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-super(beamSchema);
-this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection buildIOReader(PBegin begin) {
-PTransform, PCollection> readConverter =
-GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+return PCollection.IsBounded.UNBOUNDED;
+  }
 
-return begin
-.apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-.apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection input) {
-throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection buildIOReader(PBegin begin) {
+PTransform> readerTransform = 
schemaIO.buildReader();
+return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection input) {
+PTransform, POutput> writerTransform = 
schemaIO.buildWriter();
+return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.BOUNDED_UNKNOWN;
+return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
   This is a little tricky though since the method is on BeamSqlTable, and 
not on TableProvider. One possible solution: make SchemaIOTableWrapper an inner 
(non-static) class of SchemaIOTableProviderWrapper. Then the method with the 
default logic can be on SchemaIOTableProviderWrapper, and the sub-classes will 
be able to override it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-09 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452326180



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-super(beamSchema);
-this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection buildIOReader(PBegin begin) {
-PTransform, PCollection> readConverter =
-GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+return PCollection.IsBounded.UNBOUNDED;
+  }
 
-return begin
-.apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-.apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection input) {
-throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  public PCollection buildIOReader(PBegin begin) {
+PTransform> readerTransform = 
schemaIO.buildReader();
+return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection input) {
+PTransform, POutput> writerTransform = 
schemaIO.buildWriter();
+return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.BOUNDED_UNKNOWN;
+return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
   Great question. We should definitely keep the `BeamTableStatistics` in 
the SQL extensions for now, since it only has meaning for SQL.
   
   The reason this exists is so we can do better when optimizing SQL queries - 
it's for "cost-based optimization" - here's a [design 
doc](https://docs.google.com/document/d/1DM_bcfFbIoc_vEoqQxhC7AvHBUDVCAwToC8TYGukkII/edit)
 about our implementation of it. The idea is we can be smarter with how we 
decide to perform a SQL query if we know how "big"  each data source is (or how 
"fast" in the case of an unbounded source). In theory this could be useful 
information in core Beam as well but we're a long way from doing that kind of 
optimization there.
   
   I think what we should do is have a default implementation that returns 
either UNBOUNDED_UNKNOWN or BOUNDED_UNKNOWN based on what SchemaIO tells us it 
is (which will be sufficient for avro/parquet/pubsub), and then other table 
providers can override it if they need to.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-08 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r451872831



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * Can create child classes for IOs to pass {@link 
#schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {

Review comment:
   ```suggestion
   abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {
   ```
   I think this AutoService annotation is what's causing the Java PreCommit to 
fail. The `AutoService` annotation makes it so that a call 
`ServiceLoader.load(TableProvider.class)` will try to instantiate this class if 
it's in the classpath, and it's not possible to instantiate this since its 
abstract.
   
   Specifically this is the ServiceLoader call that's biting you:
   
https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86
   
   I think we should also make this package-private





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

2020-07-08 Thread GitBox


TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r451889886



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements 
Serializable {

Review comment:
   ```suggestion
   class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
   ```
   
   I think this can be package-private. It might also make sense to make it an 
inner class of `SchemaIOTableProviderWrapper`, but I'll leave that up to you

##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * Can create child classes for IOs to pass {@link 
#schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {

Review comment:
   ```suggestion
   abstract class SchemaCapableIOTableProviderWrapper extends 
InMemoryMetaTableProvider {
   ```
   I think this AutoService annotation is what's causing the Java PreCommit to 
fail. The `AutoService` annotation makes it so that a call 
`ServiceLoader.load(TableProvider.class)` will try to instantiate this class if 
it's in the classpath, and it's not possible  
   
   Specifically this is the ServiceLoader call that's biting you:
   
https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86
   
   I think we should also make this package-private

##
File path: 
sdks/java/core/sr