[ 
https://issues.apache.org/jira/browse/BEAM-8801?focusedWorklogId=359092&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-359092
 ]

ASF GitHub Bot logged work on BEAM-8801:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Dec/19 02:40
            Start Date: 13/Dec/19 02:40
    Worklog Time Spent: 10m 
      Work Description: milantracy commented on pull request #10359: 
[BEAM-8801] PubsubMessageToRow should not check useFlatSchema() in pr…
URL: https://github.com/apache/beam/pull/10359#discussion_r357463681
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
 ##########
 @@ -90,44 +102,94 @@ public static Builder builder() {
     return new AutoValue_PubsubMessageToRow.Builder();
   }
 
-  @DoFn.ProcessElement
-  public void processElement(ProcessContext context) {
-    try {
-      List<Object> values = getFieldValues(context);
-      
context.output(Row.withSchema(messageSchema()).addValues(values).build());
-    } catch (UnsupportedRowJsonException jsonException) {
-      if (useDlq()) {
-        context.output(DLQ_TAG, context.element());
-      } else {
-        throw new RuntimeException("Error parsing message", jsonException);
-      }
+  private PubsubIO.Write<PubsubMessage> writeMessagesToDlq() {
+    PubsubIO.Write<PubsubMessage> write = 
PubsubIO.writeMessages().to(deadLetterQueue());
+
+    return timestampAttribute() != null
+        ? write.withTimestampAttribute(timestampAttribute())
+        : write;
+  }
+
+  @Override
+  public PCollection<Row> expand(PCollection<PubsubMessage> input) {
+    Schema schema = payloadSchema();
+    PCollectionTuple rows =
+        input.apply(
+            ParDo.of(
+                    useFlatSchema()
+                        ? new FlatSchemaPubsubMessageToRoW(schema)
+                        : new NestedSchemaPubsubMessageToRow(schema))
+                .withOutputTags(
+                    MAIN_TAG,
+                    deadLetterQueue() != null ? TupleTagList.of(DLQ_TAG) : 
TupleTagList.empty()));
+
+    if (deadLetterQueue() != null) {
+      rows.get(DLQ_TAG).apply(writeMessagesToDlq());
     }
+
+    return rows.get(MAIN_TAG).setRowSchema(messageSchema());
   }
 
   /**
-   * Get values for fields in the same order they're specified in schema, 
including timestamp,
-   * payload, and attributes.
+   * A {@link DoFn} to convert a flat schema{@link PubsubMessage} with JSON 
payload to {@link Row}.
    */
-  private List<Object> getFieldValues(ProcessContext context) {
-    Row payload = parsePayloadJsonRow(context.element());
-    return messageSchema().getFields().stream()
-        .map(
-            field ->
-                getValueForField(
-                    field, context.timestamp(), 
context.element().getAttributeMap(), payload))
-        .collect(toList());
-  }
+  @Internal
+  class FlatSchemaPubsubMessageToRoW extends DoFn<PubsubMessage, Row> {
 
 Review comment:
   Done
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 359092)
    Time Spent: 2h 20m  (was: 2h 10m)

> PubsubMessageToRow should not check useFlatSchema() in processElement
> ---------------------------------------------------------------------
>
>                 Key: BEAM-8801
>                 URL: https://issues.apache.org/jira/browse/BEAM-8801
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Brian Hulette
>            Assignee: Jing Chen
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently we check useFlatSchema() for every element that's processed. 
> Instead, we should check it once at pipeline construction time. See 
> [comment|https://github.com/apache/beam/pull/10158#discussion_r348805530].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to