TheNeuralBit commented on a change in pull request #12090:
URL: https://github.com/apache/beam/pull/12090#discussion_r445908191
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -47,32 +44,40 @@
@Experimental
@AutoService(TableProvider.class)
public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
+ static final String TIMESTAMP_FIELD = "event_timestamp";
+ static final String ATTRIBUTES_FIELD = "attributes";
+ static final String PAYLOAD_FIELD = "payload";
@Override
public String getTableType() {
return "pubsub";
}
@Override
- public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
- JSONObject tableProperties = tableDefintion.getProperties();
+ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+ JSONObject tableProperties = tableDefinition.getProperties();
String timestampAttributeKey =
tableProperties.getString("timestampAttributeKey");
String deadLetterQueue = tableProperties.getString("deadLetterQueue");
- validateDlq(deadLetterQueue);
- Schema schema = tableDefintion.getSchema();
+ Schema schema = tableDefinition.getSchema();
+ String location = tableDefinition.getLocation();
+ Schema dataSchema = tableDefinition.getSchema();
+
+ validateDlq(deadLetterQueue);
validateEventTimestamp(schema);
Review comment:
Could you move this validation to the new implementation (probably in
the `from` method)? We'll want to make sure we still do the validation when
this is used outside of SQL (e.g. for cross-language).
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIO.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/** An abstraction to create schema aware IOs. */
+@Internal
+public class PubsubSchemaIO implements SchemaIO, Serializable {
Review comment:
I think we should make this a `private static` inner class within
`PubsubSchemaCapableIOProvider` so no one tries to use it on its own.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Provides a new schema aware IO abstraction interface. */
Review comment:
nit: it won't be new forever :)
```suggestion
/** Provides abstractions for schema-aware IOs. */
```
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -47,32 +44,40 @@
@Experimental
@AutoService(TableProvider.class)
public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
+ static final String TIMESTAMP_FIELD = "event_timestamp";
+ static final String ATTRIBUTES_FIELD = "attributes";
+ static final String PAYLOAD_FIELD = "payload";
@Override
public String getTableType() {
return "pubsub";
Review comment:
This could defer to `PubsubSchemaIOProvider#identifier()`, but you'd
need to have an instance of it (could be a static member, or you could add a
constructor and create one there).
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -47,32 +44,40 @@
@Experimental
@AutoService(TableProvider.class)
public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
+ static final String TIMESTAMP_FIELD = "event_timestamp";
+ static final String ATTRIBUTES_FIELD = "attributes";
+ static final String PAYLOAD_FIELD = "payload";
@Override
public String getTableType() {
return "pubsub";
}
@Override
- public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
- JSONObject tableProperties = tableDefintion.getProperties();
+ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+ JSONObject tableProperties = tableDefinition.getProperties();
String timestampAttributeKey =
tableProperties.getString("timestampAttributeKey");
String deadLetterQueue = tableProperties.getString("deadLetterQueue");
- validateDlq(deadLetterQueue);
- Schema schema = tableDefintion.getSchema();
+ Schema schema = tableDefinition.getSchema();
+ String location = tableDefinition.getLocation();
+ Schema dataSchema = tableDefinition.getSchema();
+
+ validateDlq(deadLetterQueue);
validateEventTimestamp(schema);
- PubsubIOTableConfiguration config =
- PubsubIOTableConfiguration.builder()
- .setSchema(schema)
- .setTimestampAttribute(timestampAttributeKey)
- .setDeadLetterQueue(deadLetterQueue)
- .setTopic(tableDefintion.getLocation())
- .setUseFlatSchema(!definesAttributeAndPayload(schema))
+ PubsubSchemaCapableIOProvider ioProvider = new
PubsubSchemaCapableIOProvider();
+ Schema configurationSchema = ioProvider.configurationSchema();
+
+ Row configurationRow =
+ Row.withSchema(configurationSchema)
+ .withFieldValue("timestampAttributeKey", timestampAttributeKey)
+ .withFieldValue("deadLetterQueue", deadLetterQueue)
+ .withFieldValue("useFlatSchema",
!definesAttributeAndPayload(schema))
Review comment:
It would be better if we could map the `tableProperties` into a Row by
leveraging `RowJson.RowJsonDeserializer` initialized with
`configurationSchema()`, then there wouldn't be any logic specific to pubsub
here.
The biggest hang-up with that is "useFlatSchema", since its not actually
part of the JSON - is there a reason we can't move all of the logic for
detecting flat vs. nested schemas into `PubsubSchemaCapableIOProvider#from`?
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Provides a new schema aware IO abstraction interface. */
+package org.apache.beam.sdk.schemas.io;
Review comment:
Please add `@DefaultAnnotation(NonNull.class)` like this:
https://github.com/apache/beam/blob/451af5133bc0a6416afa7b1844833c153f510181/sdks/java/core/src/main/java/org/apache/beam/sdk/package-info.java#L29-L30
That's supposed to make spotbugs verify we never pass null to a function
argument, unless it's explicitly allowed with `@Nullable`
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -38,29 +37,28 @@
* <p>Currently only supports writing a flat schema into a JSON payload. This
means that all Row
* field values are written to the {@link PubsubMessage} JSON payload, except
for {@code
* event_timestamp}, which is either ignored or written to the message
attributes, depending on
- * whether {@link
PubsubJsonTableProvider.PubsubIOTableConfiguration#getTimestampAttribute()} is
- * set.
+ * whether config.getValue("timestampAttributeKey") is set.
*/
@Experimental
public class RowToPubsubMessage extends PTransform<PCollection<Row>,
PCollection<PubsubMessage>> {
Review comment:
Hm I feel like this also should not be public... but I see that's
actually my fault since I did it originally. Would you mind fixing my mistake
and making this a `private static` inner class?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]