This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cd8eb0c9c68 Fix output timestamp for multi output receiver in FnApiDoFnRunner #25344 (#25349) cd8eb0c9c68 is described below commit cd8eb0c9c68f115a868b8e9203e0a2e3a1acdf0d Author: Luke Cwik <lc...@google.com> AuthorDate: Mon Feb 6 10:33:03 2023 -0800 Fix output timestamp for multi output receiver in FnApiDoFnRunner #25344 (#25349) * Fix output timestamp for multi output receiver in FnApiDoFnRunner #25344 Fixes #25344 --- .../src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 13d85d27006..0cfcb0a84f2 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2529,8 +2529,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public void outputWithTimestamp(T output, Instant timestamp) { - ProcessBundleContextBase.this.outputWithTimestamp( - tag, output, currentElement.getTimestamp()); + ProcessBundleContextBase.this.outputWithTimestamp(tag, output, timestamp); } }; } @@ -2816,7 +2815,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public void outputWithTimestamp(T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, currentElement.getTimestamp()); + context.outputWithTimestamp(tag, output, timestamp); } }; } @@ -3077,7 +3076,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public void outputWithTimestamp(T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, currentElement.getTimestamp()); + context.outputWithTimestamp(tag, output, timestamp); } }; }