This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new db23583d022 [#28126] plumb coder errors with better context. (#28164) db23583d022 is described below commit db23583d0227abe3625aafae98655c45c9edcf84 Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Fri Aug 25 16:08:15 2023 -0700 [#28126] plumb coder errors with better context. (#28164) * [#28126] plumb coder errors with better context. * Add hard clear checks for SDK error: no windows being encoded. * parse log * fix log line parsing * fmt --------- Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com> --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 32 +++++++++++++--------- .../pkg/beam/runners/prism/internal/coders_test.go | 6 ++-- .../prism/internal/engine/elementmanager.go | 14 ++++++++-- sdks/go/pkg/beam/runners/prism/internal/execute.go | 10 +++++-- .../beam/runners/prism/internal/handlerunner.go | 15 ++++++++-- sdks/go/pkg/beam/runners/prism/internal/stage.go | 19 ++++++++++--- .../beam/runners/prism/internal/worker/worker.go | 14 +++++++++- .../runners/prism/internal/worker/worker_test.go | 13 +++++++-- 8 files changed, 93 insertions(+), 30 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index a141440400e..64005177b94 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -53,9 +53,12 @@ func isLeafCoder(c *pipepb.Coder) bool { // // PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner // for crossing the FnAPI boundary at data sources and data sinks. -func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) string { +func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) (string, error) { col := comps.GetPcollections()[pID] - cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()) + cID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()) + if err != nil { + return "", fmt.Errorf("makeWindowedValueCoder: couldn't process coder for pcollection %q %v: %w", pID, prototext.Format(col), err) + } wcID := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId() // The runner needs to be defensive, and tell the SDK to Length Prefix @@ -73,7 +76,7 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str } // Populate the coders to send with the new windowed value coder. coders[wvcID] = wInC - return wvcID + return wvcID, nil } // makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn. @@ -94,22 +97,22 @@ func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) // lpUnknownCoders takes a coder, and populates coders with any new coders // coders that the runner needs to be safe, and speedy. // It returns either the passed in coder id, or the new safe coder id. -func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string { +func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, error) { // First check if we've already added the LP version of this coder to coders already. lpcID := cID + "_lp" // Check if we've done this one before. if _, ok := bundle[lpcID]; ok { - return lpcID + return lpcID, nil } // All coders in the coders map have been processed. if _, ok := bundle[cID]; ok { - return cID + return cID, nil } // Look up the canonical location. c, ok := base[cID] if !ok { // We messed up somewhere. - panic(fmt.Sprint("unknown coder id:", cID)) + return "", fmt.Errorf("lpUnknownCoders: coder %q not present in base map", cID) } // Add the original coder to the coders map. bundle[cID] = c @@ -124,7 +127,7 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string { ComponentCoderIds: []string{cID}, } bundle[lpcID] = lpc - return lpcID + return lpcID, nil } // We know we have a composite, so if we count this as a leaf, move everything to // the coders map. @@ -133,12 +136,15 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string { for _, cc := range c.GetComponentCoderIds() { bundle[cc] = base[cc] } - return cID + return cID, nil } var needNewComposite bool var comps []string - for _, cc := range c.GetComponentCoderIds() { - rcc := lpUnknownCoders(cc, bundle, base) + for i, cc := range c.GetComponentCoderIds() { + rcc, err := lpUnknownCoders(cc, bundle, base) + if err != nil { + return "", fmt.Errorf("lpUnknownCoders: couldn't handle component %d %q of %q %v:\n%w", i, cc, cID, prototext.Format(c), err) + } if cc != rcc { needNewComposite = true } @@ -150,9 +156,9 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string { ComponentCoderIds: comps, } bundle[lpcID] = lpc - return lpcID + return lpcID, nil } - return cID + return cID, nil } // reconcileCoders ensures that the bundle coders are primed with initial coders from diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go index c6e32c895fe..3f9557ff836 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -62,7 +62,7 @@ func Test_isLeafCoder(t *testing.T) { func Test_makeWindowedValueCoder(t *testing.T) { coders := map[string]*pipepb.Coder{} - gotID := makeWindowedValueCoder("testPID", &pipepb.Components{ + gotID, err := makeWindowedValueCoder("testPID", &pipepb.Components{ Pcollections: map[string]*pipepb.PCollection{ "testPID": {CoderId: "testCoderID"}, }, @@ -74,7 +74,9 @@ func Test_makeWindowedValueCoder(t *testing.T) { }, }, }, coders) - + if err != nil { + t.Errorf("makeWindowedValueCoder(...) = error %v, want nil", err) + } if gotID == "" { t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty", gotID) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 5e1585ffcd1..fb9c9802502 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -287,8 +287,12 @@ func reElementResiduals(residuals [][]byte, inputInfo PColInfo, rb RunBundle) [] if err == io.EOF { break } - slog.Error("reElementResiduals: error decoding residual header", err, "bundle", rb) - panic("error decoding residual header") + slog.Error("reElementResiduals: error decoding residual header", "error", err, "bundle", rb) + panic("error decoding residual header:" + err.Error()) + } + if len(ws) == 0 { + slog.Error("reElementResiduals: sdk provided a windowed value header 0 windows", "bundle", rb) + panic("error decoding residual header: sdk provided a windowed value header 0 windows") } for _, w := range ws { @@ -332,9 +336,13 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol if err == io.EOF { break } - slog.Error("PersistBundle: error decoding watermarks", err, "bundle", rb, slog.String("output", output)) + slog.Error("PersistBundle: error decoding watermarks", "error", err, "bundle", rb, slog.String("output", output)) panic("error decoding watermarks") } + if len(ws) == 0 { + slog.Error("PersistBundle: sdk provided a windowed value header 0 windows", "bundle", rb) + panic("error decoding residual header: sdk provided a windowed value header 0 windows") + } // TODO: Optimize unnecessary copies. This is doubleteeing. elmBytes := info.EDec(tee) for _, w := range ws { diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index f8b6b6f33ab..e9c898699c7 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -297,13 +297,19 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro } func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder, comps *pipepb.Components) func(io.Reader) []byte { - cID := lpUnknownCoders(coldCId, coders, comps.GetCoders()) + cID, err := lpUnknownCoders(coldCId, coders, comps.GetCoders()) + if err != nil { + panic(err) + } return pullDecoder(coders[cID], coders) } func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) { ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()] - wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) + wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) + if err != nil { + panic(err) + } return makeWindowCoders(coders[wcID]) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 05b3d3bbaa0..3f699e47e67 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -162,9 +162,18 @@ func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, com coders := map[string]*pipepb.Coder{} // TODO assert this is a KV. It's probably fine, but we should fail anyway. - wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) - kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders()) - ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders()) + wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) + if err != nil { + panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process window coder:\n%w", stageID, tid, prototext.Format(t), err)) + } + kcID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders()) + if err != nil { + panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process key coder:\n%w", stageID, tid, prototext.Format(t), err)) + } + ecID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders()) + if err != nil { + panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process value coder:\n%w", stageID, tid, prototext.Format(t), err)) + } reconcileCoders(coders, comps.GetCoders()) wc := coders[wcID] diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index b2cbe23588d..3f4451d7db3 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -33,6 +33,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "golang.org/x/exp/maps" "golang.org/x/exp/slog" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" ) @@ -290,9 +291,12 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { sink2Col := map[string]string{} col2Coders := map[string]engine.PColInfo{} for _, o := range stg.outputs { - wOutCid := makeWindowedValueCoder(o.global, comps, coders) - sinkID := o.transform + "_" + o.local col := comps.GetPcollections()[o.global] + wOutCid, err := makeWindowedValueCoder(o.global, comps, coders) + if err != nil { + return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for output %+v, pcol %q %v:\n%w", stg.ID, o, o.global, prototext.Format(col), err) + } + sinkID := o.transform + "_" + o.local ed := collectionPullDecoder(col.GetCoderId(), coders, comps) wDec, wEnc := getWindowValueCoders(comps, col, coders) sink2Col[sinkID] = o.global @@ -311,7 +315,10 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { for _, si := range stg.sideInputs { col := comps.GetPcollections()[si.global] oCID := col.GetCoderId() - nCID := lpUnknownCoders(oCID, coders, comps.GetCoders()) + nCID, err := lpUnknownCoders(oCID, coders, comps.GetCoders()) + if err != nil { + return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for side input %+v, pcol %q %v:\n%w", stg.ID, si, si.global, prototext.Format(col), err) + } sides = append(sides, si.global) if oCID != nCID { @@ -339,9 +346,13 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { // This id is directly used for the source, but this also copies // coders used by side inputs to the coders map for the bundle, so // needs to be run for every ID. - wInCid := makeWindowedValueCoder(stg.primaryInput, comps, coders) col := comps.GetPcollections()[stg.primaryInput] + wInCid, err := makeWindowedValueCoder(stg.primaryInput, comps, coders) + if err != nil { + return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for primary input, pcol %q %v:\n%w", stg.ID, stg.primaryInput, prototext.Format(col), err) + } + ed := collectionPullDecoder(col.GetCoderId(), coders, comps) wDec, wEnc := getWindowValueCoders(comps, col, coders) inputInfo := engine.PColInfo{ diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index a1d0ff79baf..dab831c20af 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -23,6 +23,8 @@ import ( "fmt" "io" "net" + "strconv" + "strings" "sync" "sync/atomic" @@ -191,8 +193,18 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { if l.Severity >= minsev { // TODO: Connect to the associated Job for this worker instead of // logging locally for SDK side logging. + file := l.GetLogLocation() + i := strings.LastIndex(file, ":") + line, _ := strconv.Atoi(file[i+1:]) + if i > 0 { + file = file[:i] + } + slog.LogAttrs(context.TODO(), toSlogSev(l.GetSeverity()), l.GetMessage(), - slog.String(slog.SourceKey, l.GetLogLocation()), + slog.Any(slog.SourceKey, &slog.Source{ + File: file, + Line: line, + }), slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()), slog.Any("worker", wk), ) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go index 68dc3fd917e..060c073fa12 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go @@ -119,8 +119,17 @@ func TestWorker_Logging(t *testing.T) { logStream.Send(&fnpb.LogEntry_List{ LogEntries: []*fnpb.LogEntry{{ - Severity: fnpb.LogEntry_Severity_INFO, - Message: "squeamish ossiphrage", + Severity: fnpb.LogEntry_Severity_INFO, + Message: "squeamish ossiphrage", + LogLocation: "intentionally.go:124", + }}, + }) + + logStream.Send(&fnpb.LogEntry_List{ + LogEntries: []*fnpb.LogEntry{{ + Severity: fnpb.LogEntry_Severity_INFO, + Message: "squeamish ossiphrage the second", + LogLocation: "intentionally bad log location", }}, })