TheNeuralBit commented on a change in pull request #12090:
URL: https://github.com/apache/beam/pull/12090#discussion_r445908191



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -47,32 +44,40 @@
 @Experimental
 @AutoService(TableProvider.class)
 public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
+  static final String TIMESTAMP_FIELD = "event_timestamp";
+  static final String ATTRIBUTES_FIELD = "attributes";
+  static final String PAYLOAD_FIELD = "payload";
 
   @Override
   public String getTableType() {
     return "pubsub";
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
-    JSONObject tableProperties = tableDefintion.getProperties();
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
     String timestampAttributeKey = 
tableProperties.getString("timestampAttributeKey");
     String deadLetterQueue = tableProperties.getString("deadLetterQueue");
-    validateDlq(deadLetterQueue);
 
-    Schema schema = tableDefintion.getSchema();
+    Schema schema = tableDefinition.getSchema();
+    String location = tableDefinition.getLocation();
+    Schema dataSchema = tableDefinition.getSchema();
+
+    validateDlq(deadLetterQueue);
     validateEventTimestamp(schema);

Review comment:
       Could you move this validation to the new implementation (probably in 
the `from` method)? We'll want to make sure we still do the validation when 
this is used outside of SQL (e.g. for cross-language).

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIO.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/** An abstraction to create schema aware IOs. */
+@Internal
+public class PubsubSchemaIO implements SchemaIO, Serializable {

Review comment:
       I think we should make this a `private static` inner class within 
`PubsubSchemaCapableIOProvider` so no one tries to use it on its own.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Provides a new schema aware IO abstraction interface. */

Review comment:
       nit: it won't be new forever :)
   ```suggestion
   /** Provides abstractions for schema-aware IOs. */
   ```

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -47,32 +44,40 @@
 @Experimental
 @AutoService(TableProvider.class)
 public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
+  static final String TIMESTAMP_FIELD = "event_timestamp";
+  static final String ATTRIBUTES_FIELD = "attributes";
+  static final String PAYLOAD_FIELD = "payload";
 
   @Override
   public String getTableType() {
     return "pubsub";

Review comment:
       This could defer to `PubsubSchemaIOProvider#identifier()`, but you'd 
need to have an instance of it (could be a static member, or you could add a 
constructor and create one there).

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -47,32 +44,40 @@
 @Experimental
 @AutoService(TableProvider.class)
 public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
+  static final String TIMESTAMP_FIELD = "event_timestamp";
+  static final String ATTRIBUTES_FIELD = "attributes";
+  static final String PAYLOAD_FIELD = "payload";
 
   @Override
   public String getTableType() {
     return "pubsub";
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
-    JSONObject tableProperties = tableDefintion.getProperties();
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
     String timestampAttributeKey = 
tableProperties.getString("timestampAttributeKey");
     String deadLetterQueue = tableProperties.getString("deadLetterQueue");
-    validateDlq(deadLetterQueue);
 
-    Schema schema = tableDefintion.getSchema();
+    Schema schema = tableDefinition.getSchema();
+    String location = tableDefinition.getLocation();
+    Schema dataSchema = tableDefinition.getSchema();
+
+    validateDlq(deadLetterQueue);
     validateEventTimestamp(schema);
 
-    PubsubIOTableConfiguration config =
-        PubsubIOTableConfiguration.builder()
-            .setSchema(schema)
-            .setTimestampAttribute(timestampAttributeKey)
-            .setDeadLetterQueue(deadLetterQueue)
-            .setTopic(tableDefintion.getLocation())
-            .setUseFlatSchema(!definesAttributeAndPayload(schema))
+    PubsubSchemaCapableIOProvider ioProvider = new 
PubsubSchemaCapableIOProvider();
+    Schema configurationSchema = ioProvider.configurationSchema();
+
+    Row configurationRow =
+        Row.withSchema(configurationSchema)
+            .withFieldValue("timestampAttributeKey", timestampAttributeKey)
+            .withFieldValue("deadLetterQueue", deadLetterQueue)
+            .withFieldValue("useFlatSchema", 
!definesAttributeAndPayload(schema))

Review comment:
       It would be better if we could map the `tableProperties` into a Row by 
leveraging `RowJson.RowJsonDeserializer` initialized with 
`configurationSchema()`, then there wouldn't be any logic specific to pubsub 
here.
   
   The biggest hang-up with that is "useFlatSchema", since its not actually 
part of the JSON - is there a reason we can't move all of the logic for 
detecting flat vs. nested schemas into `PubsubSchemaCapableIOProvider#from`?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Provides a new schema aware IO abstraction interface. */
+package org.apache.beam.sdk.schemas.io;

Review comment:
       Please add `@DefaultAnnotation(NonNull.class)` like this: 
https://github.com/apache/beam/blob/451af5133bc0a6416afa7b1844833c153f510181/sdks/java/core/src/main/java/org/apache/beam/sdk/package-info.java#L29-L30
   
   That's supposed to make spotbugs verify we never pass null to a function 
argument, unless it's explicitly allowed with `@Nullable`

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -38,29 +37,28 @@
  * <p>Currently only supports writing a flat schema into a JSON payload. This 
means that all Row
  * field values are written to the {@link PubsubMessage} JSON payload, except 
for {@code
  * event_timestamp}, which is either ignored or written to the message 
attributes, depending on
- * whether {@link 
PubsubJsonTableProvider.PubsubIOTableConfiguration#getTimestampAttribute()} is
- * set.
+ * whether config.getValue("timestampAttributeKey") is set.
  */
 @Experimental
 public class RowToPubsubMessage extends PTransform<PCollection<Row>, 
PCollection<PubsubMessage>> {

Review comment:
       Hm I feel like this also should not be public... but I see that's 
actually my fault since I did it originally. Would you mind fixing my mistake 
and making this a `private static` inner class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to