This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new db23583d022 [#28126] plumb coder errors with better context. (#28164)
db23583d022 is described below

commit db23583d0227abe3625aafae98655c45c9edcf84
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Fri Aug 25 16:08:15 2023 -0700

    [#28126] plumb coder errors with better context. (#28164)
    
    * [#28126] plumb coder errors with better context.
    
    * Add hard clear checks for SDK error: no windows being encoded.
    
    * parse log
    
    * fix log line parsing
    
    * fmt
    
    ---------
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 sdks/go/pkg/beam/runners/prism/internal/coders.go  | 32 +++++++++++++---------
 .../pkg/beam/runners/prism/internal/coders_test.go |  6 ++--
 .../prism/internal/engine/elementmanager.go        | 14 ++++++++--
 sdks/go/pkg/beam/runners/prism/internal/execute.go | 10 +++++--
 .../beam/runners/prism/internal/handlerunner.go    | 15 ++++++++--
 sdks/go/pkg/beam/runners/prism/internal/stage.go   | 19 ++++++++++---
 .../beam/runners/prism/internal/worker/worker.go   | 14 +++++++++-
 .../runners/prism/internal/worker/worker_test.go   | 13 +++++++--
 8 files changed, 93 insertions(+), 30 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go 
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
index a141440400e..64005177b94 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -53,9 +53,12 @@ func isLeafCoder(c *pipepb.Coder) bool {
 //
 // PCollection coders are not inherently WindowValueCoder wrapped, and they 
are added by the runner
 // for crossing the FnAPI boundary at data sources and data sinks.
-func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders 
map[string]*pipepb.Coder) string {
+func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders 
map[string]*pipepb.Coder) (string, error) {
        col := comps.GetPcollections()[pID]
-       cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
+       cID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
+       if err != nil {
+               return "", fmt.Errorf("makeWindowedValueCoder: couldn't process 
coder for pcollection %q %v: %w", pID, prototext.Format(col), err)
+       }
        wcID := 
comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId()
 
        // The runner needs to be defensive, and tell the SDK to Length Prefix
@@ -73,7 +76,7 @@ func makeWindowedValueCoder(pID string, comps 
*pipepb.Components, coders map[str
        }
        // Populate the coders to send with the new windowed value coder.
        coders[wvcID] = wInC
-       return wvcID
+       return wvcID, nil
 }
 
 // makeWindowCoders makes the coder pair but behavior is ultimately determined 
by the strategy's windowFn.
@@ -94,22 +97,22 @@ func makeWindowCoders(wc *pipepb.Coder) 
(exec.WindowDecoder, exec.WindowEncoder)
 // lpUnknownCoders takes a coder, and populates coders with any new coders
 // coders that the runner needs to be safe, and speedy.
 // It returns either the passed in coder id, or the new safe coder id.
-func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string 
{
+func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) 
(string, error) {
        // First check if we've already added the LP version of this coder to 
coders already.
        lpcID := cID + "_lp"
        // Check if we've done this one before.
        if _, ok := bundle[lpcID]; ok {
-               return lpcID
+               return lpcID, nil
        }
        // All coders in the coders map have been processed.
        if _, ok := bundle[cID]; ok {
-               return cID
+               return cID, nil
        }
        // Look up the canonical location.
        c, ok := base[cID]
        if !ok {
                // We messed up somewhere.
-               panic(fmt.Sprint("unknown coder id:", cID))
+               return "", fmt.Errorf("lpUnknownCoders: coder %q not present in 
base map", cID)
        }
        // Add the original coder to the coders map.
        bundle[cID] = c
@@ -124,7 +127,7 @@ func lpUnknownCoders(cID string, bundle, base 
map[string]*pipepb.Coder) string {
                        ComponentCoderIds: []string{cID},
                }
                bundle[lpcID] = lpc
-               return lpcID
+               return lpcID, nil
        }
        // We know we have a composite, so if we count this as a leaf, move 
everything to
        // the coders map.
@@ -133,12 +136,15 @@ func lpUnknownCoders(cID string, bundle, base 
map[string]*pipepb.Coder) string {
                for _, cc := range c.GetComponentCoderIds() {
                        bundle[cc] = base[cc]
                }
-               return cID
+               return cID, nil
        }
        var needNewComposite bool
        var comps []string
-       for _, cc := range c.GetComponentCoderIds() {
-               rcc := lpUnknownCoders(cc, bundle, base)
+       for i, cc := range c.GetComponentCoderIds() {
+               rcc, err := lpUnknownCoders(cc, bundle, base)
+               if err != nil {
+                       return "", fmt.Errorf("lpUnknownCoders: couldn't handle 
component %d %q of %q %v:\n%w", i, cc, cID, prototext.Format(c), err)
+               }
                if cc != rcc {
                        needNewComposite = true
                }
@@ -150,9 +156,9 @@ func lpUnknownCoders(cID string, bundle, base 
map[string]*pipepb.Coder) string {
                        ComponentCoderIds: comps,
                }
                bundle[lpcID] = lpc
-               return lpcID
+               return lpcID, nil
        }
-       return cID
+       return cID, nil
 }
 
 // reconcileCoders ensures that the bundle coders are primed with initial 
coders from
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
index c6e32c895fe..3f9557ff836 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
@@ -62,7 +62,7 @@ func Test_isLeafCoder(t *testing.T) {
 func Test_makeWindowedValueCoder(t *testing.T) {
        coders := map[string]*pipepb.Coder{}
 
-       gotID := makeWindowedValueCoder("testPID", &pipepb.Components{
+       gotID, err := makeWindowedValueCoder("testPID", &pipepb.Components{
                Pcollections: map[string]*pipepb.PCollection{
                        "testPID": {CoderId: "testCoderID"},
                },
@@ -74,7 +74,9 @@ func Test_makeWindowedValueCoder(t *testing.T) {
                        },
                },
        }, coders)
-
+       if err != nil {
+               t.Errorf("makeWindowedValueCoder(...) = error %v, want nil", 
err)
+       }
        if gotID == "" {
                t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty", 
gotID)
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 5e1585ffcd1..fb9c9802502 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -287,8 +287,12 @@ func reElementResiduals(residuals [][]byte, inputInfo 
PColInfo, rb RunBundle) []
                        if err == io.EOF {
                                break
                        }
-                       slog.Error("reElementResiduals: error decoding residual 
header", err, "bundle", rb)
-                       panic("error decoding residual header")
+                       slog.Error("reElementResiduals: error decoding residual 
header", "error", err, "bundle", rb)
+                       panic("error decoding residual header:" + err.Error())
+               }
+               if len(ws) == 0 {
+                       slog.Error("reElementResiduals: sdk provided a windowed 
value header 0 windows", "bundle", rb)
+                       panic("error decoding residual header: sdk provided a 
windowed value header 0 windows")
                }
 
                for _, w := range ws {
@@ -332,9 +336,13 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
                                        if err == io.EOF {
                                                break
                                        }
-                                       slog.Error("PersistBundle: error 
decoding watermarks", err, "bundle", rb, slog.String("output", output))
+                                       slog.Error("PersistBundle: error 
decoding watermarks", "error", err, "bundle", rb, slog.String("output", output))
                                        panic("error decoding watermarks")
                                }
+                               if len(ws) == 0 {
+                                       slog.Error("PersistBundle: sdk provided 
a windowed value header 0 windows", "bundle", rb)
+                                       panic("error decoding residual header: 
sdk provided a windowed value header 0 windows")
+                               }
                                // TODO: Optimize unnecessary copies. This is 
doubleteeing.
                                elmBytes := info.EDec(tee)
                                for _, w := range ws {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index f8b6b6f33ab..e9c898699c7 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -297,13 +297,19 @@ func executePipeline(ctx context.Context, wk *worker.W, j 
*jobservices.Job) erro
 }
 
 func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder, 
comps *pipepb.Components) func(io.Reader) []byte {
-       cID := lpUnknownCoders(coldCId, coders, comps.GetCoders())
+       cID, err := lpUnknownCoders(coldCId, coders, comps.GetCoders())
+       if err != nil {
+               panic(err)
+       }
        return pullDecoder(coders[cID], coders)
 }
 
 func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, 
coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
        ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()]
-       wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, 
comps.GetCoders())
+       wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, 
comps.GetCoders())
+       if err != nil {
+               panic(err)
+       }
        return makeWindowCoders(coders[wcID])
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go 
b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
index 05b3d3bbaa0..3f699e47e67 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -162,9 +162,18 @@ func (h *runner) ExecuteTransform(stageID, tid string, t 
*pipepb.PTransform, com
                coders := map[string]*pipepb.Coder{}
 
                // TODO assert this is a KV. It's probably fine, but we should 
fail anyway.
-               wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, 
comps.GetCoders())
-               kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, 
comps.GetCoders())
-               ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, 
comps.GetCoders())
+               wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, 
comps.GetCoders())
+               if err != nil {
+                       panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, 
transform %q %v: couldn't process window coder:\n%w", stageID, tid, 
prototext.Format(t), err))
+               }
+               kcID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[0], 
coders, comps.GetCoders())
+               if err != nil {
+                       panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, 
transform %q %v: couldn't process key coder:\n%w", stageID, tid, 
prototext.Format(t), err))
+               }
+               ecID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[1], 
coders, comps.GetCoders())
+               if err != nil {
+                       panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, 
transform %q %v: couldn't process value coder:\n%w", stageID, tid, 
prototext.Format(t), err))
+               }
                reconcileCoders(coders, comps.GetCoders())
 
                wc := coders[wcID]
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index b2cbe23588d..3f4451d7db3 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -33,6 +33,7 @@ import (
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
        "golang.org/x/exp/maps"
        "golang.org/x/exp/slog"
+       "google.golang.org/protobuf/encoding/prototext"
        "google.golang.org/protobuf/proto"
 )
 
@@ -290,9 +291,12 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, 
wk *worker.W) error {
        sink2Col := map[string]string{}
        col2Coders := map[string]engine.PColInfo{}
        for _, o := range stg.outputs {
-               wOutCid := makeWindowedValueCoder(o.global, comps, coders)
-               sinkID := o.transform + "_" + o.local
                col := comps.GetPcollections()[o.global]
+               wOutCid, err := makeWindowedValueCoder(o.global, comps, coders)
+               if err != nil {
+                       return fmt.Errorf("buildDescriptor: failed to handle 
coder on stage %v for output %+v, pcol %q %v:\n%w", stg.ID, o, o.global, 
prototext.Format(col), err)
+               }
+               sinkID := o.transform + "_" + o.local
                ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
                wDec, wEnc := getWindowValueCoders(comps, col, coders)
                sink2Col[sinkID] = o.global
@@ -311,7 +315,10 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, 
wk *worker.W) error {
        for _, si := range stg.sideInputs {
                col := comps.GetPcollections()[si.global]
                oCID := col.GetCoderId()
-               nCID := lpUnknownCoders(oCID, coders, comps.GetCoders())
+               nCID, err := lpUnknownCoders(oCID, coders, comps.GetCoders())
+               if err != nil {
+                       return fmt.Errorf("buildDescriptor: failed to handle 
coder on stage %v for side input %+v, pcol %q %v:\n%w", stg.ID, si, si.global, 
prototext.Format(col), err)
+               }
 
                sides = append(sides, si.global)
                if oCID != nCID {
@@ -339,9 +346,13 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, 
wk *worker.W) error {
        // This id is directly used for the source, but this also copies
        // coders used by side inputs to the coders map for the bundle, so
        // needs to be run for every ID.
-       wInCid := makeWindowedValueCoder(stg.primaryInput, comps, coders)
 
        col := comps.GetPcollections()[stg.primaryInput]
+       wInCid, err := makeWindowedValueCoder(stg.primaryInput, comps, coders)
+       if err != nil {
+               return fmt.Errorf("buildDescriptor: failed to handle coder on 
stage %v for primary input, pcol %q %v:\n%w", stg.ID, stg.primaryInput, 
prototext.Format(col), err)
+       }
+
        ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
        wDec, wEnc := getWindowValueCoders(comps, col, coders)
        inputInfo := engine.PColInfo{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index a1d0ff79baf..dab831c20af 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -23,6 +23,8 @@ import (
        "fmt"
        "io"
        "net"
+       "strconv"
+       "strings"
        "sync"
        "sync/atomic"
 
@@ -191,8 +193,18 @@ func (wk *W) Logging(stream 
fnpb.BeamFnLogging_LoggingServer) error {
                        if l.Severity >= minsev {
                                // TODO: Connect to the associated Job for this 
worker instead of
                                // logging locally for SDK side logging.
+                               file := l.GetLogLocation()
+                               i := strings.LastIndex(file, ":")
+                               line, _ := strconv.Atoi(file[i+1:])
+                               if i > 0 {
+                                       file = file[:i]
+                               }
+
                                slog.LogAttrs(context.TODO(), 
toSlogSev(l.GetSeverity()), l.GetMessage(),
-                                       slog.String(slog.SourceKey, 
l.GetLogLocation()),
+                                       slog.Any(slog.SourceKey, &slog.Source{
+                                               File: file,
+                                               Line: line,
+                                       }),
                                        slog.Time(slog.TimeKey, 
l.GetTimestamp().AsTime()),
                                        slog.Any("worker", wk),
                                )
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
index 68dc3fd917e..060c073fa12 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -119,8 +119,17 @@ func TestWorker_Logging(t *testing.T) {
 
        logStream.Send(&fnpb.LogEntry_List{
                LogEntries: []*fnpb.LogEntry{{
-                       Severity: fnpb.LogEntry_Severity_INFO,
-                       Message:  "squeamish ossiphrage",
+                       Severity:    fnpb.LogEntry_Severity_INFO,
+                       Message:     "squeamish ossiphrage",
+                       LogLocation: "intentionally.go:124",
+               }},
+       })
+
+       logStream.Send(&fnpb.LogEntry_List{
+               LogEntries: []*fnpb.LogEntry{{
+                       Severity:    fnpb.LogEntry_Severity_INFO,
+                       Message:     "squeamish ossiphrage the second",
+                       LogLocation: "intentionally bad log location",
                }},
        })
 

Reply via email to