[ 
https://issues.apache.org/jira/browse/BEAM-8801?focusedWorklogId=358876&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358876
 ]

ASF GitHub Bot logged work on BEAM-8801:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/19 19:23
            Start Date: 12/Dec/19 19:23
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on pull request #10359: 
[BEAM-8801] PubsubMessageToRow should not check useFlatSchema() in pr…
URL: https://github.com/apache/beam/pull/10359#discussion_r357321601
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
 ##########
 @@ -90,44 +102,94 @@ public static Builder builder() {
     return new AutoValue_PubsubMessageToRow.Builder();
   }
 
-  @DoFn.ProcessElement
-  public void processElement(ProcessContext context) {
-    try {
-      List<Object> values = getFieldValues(context);
-      
context.output(Row.withSchema(messageSchema()).addValues(values).build());
-    } catch (UnsupportedRowJsonException jsonException) {
-      if (useDlq()) {
-        context.output(DLQ_TAG, context.element());
-      } else {
-        throw new RuntimeException("Error parsing message", jsonException);
-      }
+  private PubsubIO.Write<PubsubMessage> writeMessagesToDlq() {
+    PubsubIO.Write<PubsubMessage> write = 
PubsubIO.writeMessages().to(deadLetterQueue());
+
+    return timestampAttribute() != null
+        ? write.withTimestampAttribute(timestampAttribute())
+        : write;
+  }
+
+  @Override
+  public PCollection<Row> expand(PCollection<PubsubMessage> input) {
+    Schema schema = payloadSchema();
+    PCollectionTuple rows =
+        input.apply(
+            ParDo.of(
+                    useFlatSchema()
+                        ? new FlatSchemaPubsubMessageToRoW(schema)
+                        : new NestedSchemaPubsubMessageToRow(schema))
 
 Review comment:
   It would be nice if this were the only place that we branch based on 
`useFlatSchema` (currently it also happens inside `payloadSchema()`). 
   
   What do you think about passing the message schema in the constructor for 
these DoFn classes instead? Then each implementation can be responsible for 
extracting the payload schema they need, we can get rid of `payloadSchema()`, 
and the `DoFn`s can be static (I think).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 358876)
    Time Spent: 1h 20m  (was: 1h 10m)

> PubsubMessageToRow should not check useFlatSchema() in processElement
> ---------------------------------------------------------------------
>
>                 Key: BEAM-8801
>                 URL: https://issues.apache.org/jira/browse/BEAM-8801
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Brian Hulette
>            Assignee: Jing Chen
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently we check useFlatSchema() for every element that's processed. 
> Instead, we should check it once at pipeline construction time. See 
> [comment|https://github.com/apache/beam/pull/10158#discussion_r348805530].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to