This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cd8eb0c9c68 Fix output timestamp for multi output receiver in 
FnApiDoFnRunner #25344 (#25349)
cd8eb0c9c68 is described below

commit cd8eb0c9c68f115a868b8e9203e0a2e3a1acdf0d
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Feb 6 10:33:03 2023 -0800

    Fix output timestamp for multi output receiver in FnApiDoFnRunner #25344 
(#25349)
    
    * Fix output timestamp for multi output receiver in FnApiDoFnRunner #25344
    
    Fixes #25344
---
 .../src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 13d85d27006..0cfcb0a84f2 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2529,8 +2529,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
               @Override
               public void outputWithTimestamp(T output, Instant timestamp) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, output, currentElement.getTimestamp());
+                ProcessBundleContextBase.this.outputWithTimestamp(tag, output, 
timestamp);
               }
             };
           }
@@ -2816,7 +2815,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
               @Override
               public void outputWithTimestamp(T output, Instant timestamp) {
-                context.outputWithTimestamp(tag, output, 
currentElement.getTimestamp());
+                context.outputWithTimestamp(tag, output, timestamp);
               }
             };
           }
@@ -3077,7 +3076,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
               @Override
               public void outputWithTimestamp(T output, Instant timestamp) {
-                context.outputWithTimestamp(tag, output, 
currentElement.getTimestamp());
+                context.outputWithTimestamp(tag, output, timestamp);
               }
             };
           }

Reply via email to