[ https://issues.apache.org/jira/browse/BEAM-8801?focusedWorklogId=359092&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-359092 ]
ASF GitHub Bot logged work on BEAM-8801: ---------------------------------------- Author: ASF GitHub Bot Created on: 13/Dec/19 02:40 Start Date: 13/Dec/19 02:40 Worklog Time Spent: 10m Work Description: milantracy commented on pull request #10359: [BEAM-8801] PubsubMessageToRow should not check useFlatSchema() in pr… URL: https://github.com/apache/beam/pull/10359#discussion_r357463681 ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java ########## @@ -90,44 +102,94 @@ public static Builder builder() { return new AutoValue_PubsubMessageToRow.Builder(); } - @DoFn.ProcessElement - public void processElement(ProcessContext context) { - try { - List<Object> values = getFieldValues(context); - context.output(Row.withSchema(messageSchema()).addValues(values).build()); - } catch (UnsupportedRowJsonException jsonException) { - if (useDlq()) { - context.output(DLQ_TAG, context.element()); - } else { - throw new RuntimeException("Error parsing message", jsonException); - } + private PubsubIO.Write<PubsubMessage> writeMessagesToDlq() { + PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages().to(deadLetterQueue()); + + return timestampAttribute() != null + ? write.withTimestampAttribute(timestampAttribute()) + : write; + } + + @Override + public PCollection<Row> expand(PCollection<PubsubMessage> input) { + Schema schema = payloadSchema(); + PCollectionTuple rows = + input.apply( + ParDo.of( + useFlatSchema() + ? new FlatSchemaPubsubMessageToRoW(schema) + : new NestedSchemaPubsubMessageToRow(schema)) + .withOutputTags( + MAIN_TAG, + deadLetterQueue() != null ? TupleTagList.of(DLQ_TAG) : TupleTagList.empty())); + + if (deadLetterQueue() != null) { + rows.get(DLQ_TAG).apply(writeMessagesToDlq()); } + + return rows.get(MAIN_TAG).setRowSchema(messageSchema()); } /** - * Get values for fields in the same order they're specified in schema, including timestamp, - * payload, and attributes. + * A {@link DoFn} to convert a flat schema{@link PubsubMessage} with JSON payload to {@link Row}. */ - private List<Object> getFieldValues(ProcessContext context) { - Row payload = parsePayloadJsonRow(context.element()); - return messageSchema().getFields().stream() - .map( - field -> - getValueForField( - field, context.timestamp(), context.element().getAttributeMap(), payload)) - .collect(toList()); - } + @Internal + class FlatSchemaPubsubMessageToRoW extends DoFn<PubsubMessage, Row> { Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 359092) Time Spent: 2h 20m (was: 2h 10m) > PubsubMessageToRow should not check useFlatSchema() in processElement > --------------------------------------------------------------------- > > Key: BEAM-8801 > URL: https://issues.apache.org/jira/browse/BEAM-8801 > Project: Beam > Issue Type: Improvement > Components: dsl-sql > Reporter: Brian Hulette > Assignee: Jing Chen > Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently we check useFlatSchema() for every element that's processed. > Instead, we should check it once at pipeline construction time. See > [comment|https://github.com/apache/beam/pull/10158#discussion_r348805530]. -- This message was sent by Atlassian Jira (v8.3.4#803005)