This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 18a21e0  Fix a bug: certain error in LiftedCombine is not propagated 
out (#7518)
18a21e0 is described below

commit 18a21e0eeb302825341c5da4a0030e2ae760481e
Author: Tianyang Hu <hty...@gmail.com>
AuthorDate: Fri Jan 18 10:18:09 2019 -0800

    Fix a bug: certain error in LiftedCombine is not propagated out (#7518)
    
    * Fix a bug: certain error in LiftedCombine.FinishBundle is not propagated 
out
---
 sdks/go/pkg/beam/core/runtime/exec/combine.go | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go 
b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index b9a6eb6..067f852 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -319,13 +319,12 @@ func (n *LiftedCombine) FinishBundle(ctx context.Context) 
error {
        // Need to run n.Out.ProcessElement for all the cached precombined KVs, 
and
        // then finally Finish bundle as normal.
        for _, a := range n.cache {
-               n.Out.ProcessElement(ctx, a)
+               if err := n.Out.ProcessElement(ctx, a); err != nil {
+                       return err
+               }
        }
 
-       if err := n.Out.FinishBundle(ctx); err != nil {
-               return n.fail(err)
-       }
-       return nil
+       return n.Out.FinishBundle(ctx)
 }
 
 // Down tears down the cache.

Reply via email to