TheNeuralBit commented on a change in pull request #12090:
URL: https://github.com/apache/beam/pull/12090#discussion_r447326025



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -54,125 +52,27 @@ public String getTableType() {
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
-    JSONObject tableProperties = tableDefintion.getProperties();
-    String timestampAttributeKey = 
tableProperties.getString("timestampAttributeKey");
-    String deadLetterQueue = tableProperties.getString("deadLetterQueue");
-    validateDlq(deadLetterQueue);
-
-    Schema schema = tableDefintion.getSchema();
-    validateEventTimestamp(schema);
-
-    PubsubIOTableConfiguration config =
-        PubsubIOTableConfiguration.builder()
-            .setSchema(schema)
-            .setTimestampAttribute(timestampAttributeKey)
-            .setDeadLetterQueue(deadLetterQueue)
-            .setTopic(tableDefintion.getLocation())
-            .setUseFlatSchema(!definesAttributeAndPayload(schema))
-            .build();
-
-    return PubsubIOJsonTable.withConfiguration(config);
-  }
-
-  private void validateEventTimestamp(Schema schema) {
-    if (!fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)) {
-      throw new InvalidTableException(
-          "Unsupported schema specified for Pubsub source in CREATE TABLE."
-              + "CREATE TABLE for Pubsub topic must include at least 
'event_timestamp' field of "
-              + "type 'TIMESTAMP'");
-    }
-  }
-
-  private boolean definesAttributeAndPayload(Schema schema) {
-    return fieldPresent(
-            schema, ATTRIBUTES_FIELD, 
Schema.FieldType.map(VARCHAR.withNullable(false), VARCHAR))
-        && (schema.hasField(PAYLOAD_FIELD)
-            && 
ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()));
-  }
-
-  private boolean fieldPresent(Schema schema, String field, Schema.FieldType 
expectedType) {
-    return schema.hasField(field)
-        && expectedType.equivalent(
-            schema.getField(field).getType(), 
Schema.EquivalenceNullablePolicy.IGNORE);
-  }
-
-  private void validateDlq(String deadLetterQueue) {
-    if (deadLetterQueue != null && deadLetterQueue.isEmpty()) {
-      throw new InvalidTableException("Dead letter queue topic name is not 
specified");
-    }
-  }
-
-  @AutoValue
-  public abstract static class PubsubIOTableConfiguration implements 
Serializable {
-    public boolean useDlq() {
-      return getDeadLetterQueue() != null;
-    }
-
-    public boolean useTimestampAttribute() {
-      return getTimestampAttribute() != null;
-    }
-
-    /** Determines whether or not the messages should be represented with a 
flattened schema. */
-    abstract boolean getUseFlatSchema();
-
-    /**
-     * Optional attribute key of the Pubsub message from which to extract the 
event timestamp.
-     *
-     * <p>This attribute has to conform to the same requirements as in {@link
-     * PubsubIO.Read.Builder#withTimestampAttribute}.
-     *
-     * <p>Short version: it has to be either millis since epoch or string in 
RFC 3339 format.
-     *
-     * <p>If the attribute is specified then event timestamps will be 
extracted from the specified
-     * attribute. If it is not specified then message publish timestamp will 
be used.
-     */
-    @Nullable
-    abstract String getTimestampAttribute();
-
-    /**
-     * Optional topic path which will be used as a dead letter queue.
-     *
-     * <p>Messages that cannot be processed will be sent to this topic. If it 
is not specified then
-     * exception will be thrown for errors during processing causing the 
pipeline to crash.
-     */
-    @Nullable
-    abstract String getDeadLetterQueue();
-
-    /**
-     * Pubsub topic name.
-     *
-     * <p>Topic is the only way to specify the Pubsub source. Explicitly 
specifying the subscription
-     * is not supported at the moment. Subscriptions are automatically created 
(but not deleted).
-     */
-    abstract String getTopic();
-
-    /**
-     * Table schema, describes Pubsub message schema.
-     *
-     * <p>If {@link #getUseFlatSchema()} is not set, schema must contain 
exactly fields
-     * 'event_timestamp', 'attributes, and 'payload'. Else, it must contain 
just 'event_timestamp'.
-     * See {@linkA PubsubMessageToRow} for details.
-     */
-    public abstract Schema getSchema();
-
-    static Builder builder() {
-      return new 
AutoValue_PubsubJsonTableProvider_PubsubIOTableConfiguration.Builder();
-    }
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setUseFlatSchema(boolean useFlatSchema);
-
-      abstract Builder setSchema(Schema schema);
-
-      abstract Builder setTimestampAttribute(String timestampAttribute);
-
-      abstract Builder setDeadLetterQueue(String deadLetterQueue);
-
-      abstract Builder setTopic(String topic);
-
-      abstract PubsubIOTableConfiguration build();
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
+    PubsubSchemaCapableIOProvider ioProvider = new 
PubsubSchemaCapableIOProvider();
+
+    try {
+      RowJsonDeserializer deserializer =
+          RowJsonDeserializer.forSchema(ioProvider.configurationSchema())
+              
.withMissingFieldBehavior(RowJsonDeserializer.MissingFieldBehavior.ALLOW_MISSING);
+
+      Row configurationRow =
+          
newObjectMapperWith(deserializer).readValue(tableProperties.toString(), 
Row.class);
+
+      SchemaIO pubsubSchemaIO =
+          ioProvider.from(
+              tableDefinition.getLocation(), configurationRow, 
tableDefinition.getSchema());
+
+      return PubsubIOJsonTable.fromSchemaIO(pubsubSchemaIO);
+    } catch (InvalidConfigurationException | InvalidSchemaException e) {
+      throw new InvalidTableException(e.getMessage());
+    } catch (JsonProcessingException e) {
+      throw new AssertionError();

Review comment:
       This should have a meaningful message like "Failed to re-parse 
TBLPROPERTIES JSON"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to