This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c0dd6e7 [BEAM-3489] DataflowRunner to read PubsubMessage if getNeedsMessageId is true (#11873) c0dd6e7 is described below commit c0dd6e72ebe5103f817d8c7ccaf02f1a67368aa5 Author: Thinh Ha <thinh.t...@gmail.com> AuthorDate: Wed Jun 17 21:27:30 2020 +0100 [BEAM-3489] DataflowRunner to read PubsubMessage if getNeedsMessageId is true (#11873) * DataflowRunner to read PubsubMessage if getNeedsMessageId is true * changed | to || * spotless apply --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a6358c1..38d88e3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1353,11 +1353,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute()); } // In both cases, the transform needs to read PubsubMessage. However, in case it needs - // the attributes, we supply an identity "parse fn" so the worker will read PubsubMessage's - // from Windmill and simply pass them around; and in case it doesn't need attributes, - // we're already implicitly using a "Coder" that interprets the data as a PubsubMessage's - // payload. - if (overriddenTransform.getNeedsAttributes()) { + // the attributes or messageId, we supply an identity "parse fn" so the worker will + // read PubsubMessage's from Windmill and simply pass them around; and in case it + // doesn't need attributes, we're already implicitly using a "Coder" that interprets + // the data as a PubsubMessage's payload. + if (overriddenTransform.getNeedsAttributes() || overriddenTransform.getNeedsMessageId()) { stepContext.addInput( PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));