This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 18a21e0 Fix a bug: certain error in LiftedCombine is not propagated out (#7518) 18a21e0 is described below commit 18a21e0eeb302825341c5da4a0030e2ae760481e Author: Tianyang Hu <hty...@gmail.com> AuthorDate: Fri Jan 18 10:18:09 2019 -0800 Fix a bug: certain error in LiftedCombine is not propagated out (#7518) * Fix a bug: certain error in LiftedCombine.FinishBundle is not propagated out --- sdks/go/pkg/beam/core/runtime/exec/combine.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go index b9a6eb6..067f852 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go @@ -319,13 +319,12 @@ func (n *LiftedCombine) FinishBundle(ctx context.Context) error { // Need to run n.Out.ProcessElement for all the cached precombined KVs, and // then finally Finish bundle as normal. for _, a := range n.cache { - n.Out.ProcessElement(ctx, a) + if err := n.Out.ProcessElement(ctx, a); err != nil { + return err + } } - if err := n.Out.FinishBundle(ctx); err != nil { - return n.fail(err) - } - return nil + return n.Out.FinishBundle(ctx) } // Down tears down the cache.