This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b426d34f622 [BEAM-14557] Read and Seek Runner Capabilities in Go SDK  
(#17821)
b426d34f622 is described below

commit b426d34f62271f8ccb4d24e1f82b28cd26ec8af3
Author: Ritesh Ghorse <riteshgho...@gmail.com>
AuthorDate: Mon Jun 13 22:04:28 2022 -0400

    [BEAM-14557] Read and Seek Runner Capabilities in Go SDK  (#17821)
---
 sdks/go/container/boot.go                          |   4 +
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  10 +-
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  25 +++-
 .../go/pkg/beam/core/runtime/harness/monitoring.go | 133 +++++++++++----------
 4 files changed, 102 insertions(+), 70 deletions(-)

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 88b654230c1..b180f54bf18 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -111,6 +111,10 @@ func main() {
                args = append(args, 
"--status_endpoint="+info.GetStatusEndpoint().GetUrl())
        }
 
+       if len(info.GetRunnerCapabilities()) > 0 {
+               os.Setenv("RUNNER_CAPABILITIES", 
strings.Join(info.GetRunnerCapabilities(), " "))
+       }
+
        log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index bde9f9b676a..cb42da25397 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -63,9 +63,10 @@ const (
        URNWindowMappingFixed   = "beam:go:windowmapping:fixed:v1"
        URNWindowMappingSliding = "beam:go:windowmapping:sliding:v1"
 
-       URNLegacyProgressReporting = "beam:protocol:progress_reporting:v0"
-       URNMultiCore               = 
"beam:protocol:multi_core_bundle_processing:v1"
-       URNWorkerStatus            = "beam:protocol:worker_status:v1"
+       URNProgressReporting     = "beam:protocol:progress_reporting:v1"
+       URNMultiCore             = 
"beam:protocol:multi_core_bundle_processing:v1"
+       URNWorkerStatus          = "beam:protocol:worker_status:v1"
+       URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
 
        URNRequiresSplittableDoFn     = 
"beam:requirement:pardo:splittable_dofn:v1"
        URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
@@ -86,10 +87,11 @@ const (
 
 func goCapabilities() []string {
        capabilities := []string{
-               URNLegacyProgressReporting,
+               URNProgressReporting,
                URNMultiCore,
                URNTruncate,
                URNWorkerStatus,
+               URNMonitoringInfoShortID,
                // TOOD(BEAM-9614): Make this versioned.
                "beam:version:sdk_base:go",
        }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 04a2baf41b4..4ed4cf6d4b2 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -20,6 +20,8 @@ import (
        "context"
        "fmt"
        "io"
+       "os"
+       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -40,6 +42,9 @@ import (
 // StatusAddress is a type of status endpoint address as an optional argument 
to harness.Main().
 type StatusAddress string
 
+// URNMonitoringInfoShortID is a URN indicating support for short monitoring 
info IDs.
+const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
+
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a 
plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
@@ -58,6 +63,17 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string, options
                }
        }
 
+       // Extract environment variables. These are optional runner supported 
capabilities.
+       // Expected env variables:
+       // RUNNER_CAPABILITIES : list of runner supported capability urn.
+       runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " 
")
+       rcMap := make(map[string]bool)
+       if len(runnerCapabilities) > 0 {
+               for _, capability := range runnerCapabilities {
+                       rcMap[capability] = true
+               }
+       }
+
        // Pass in the logging endpoint for use w/the default remote logging 
hook.
        ctx = context.WithValue(ctx, loggingEndpointCtxKey, loggingEndpoint)
        ctx, err := hooks.RunInitHooks(ctx)
@@ -139,6 +155,7 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string, options
                data:                 &DataChannelManager{},
                state:                &StateChannelManager{},
                cache:                &sideCache,
+               runnerCapabilities:   rcMap,
        }
        // gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
        // is responsible for managing the network data. All it does is pull 
data from
@@ -275,7 +292,8 @@ type control struct {
        data  *DataChannelManager
        state *StateChannelManager
        // TODO(BEAM-11097): Cache is currently unused.
-       cache *statecache.SideInputCache
+       cache              *statecache.SideInputCache
+       runnerCapabilities map[string]bool
 }
 
 func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) 
{
@@ -370,7 +388,8 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
 
                c.cache.CompleteBundle(tokens...)
 
-               mons, pylds := monitoring(plan, store)
+               mons, pylds := monitoring(plan, store, 
c.runnerCapabilities[URNMonitoringInfoShortID])
+
                requiresFinalization := false
                // Move the plan back to the candidate state
                c.mu.Lock()
@@ -493,7 +512,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                        }
                }
 
-               mons, pylds := monitoring(plan, store)
+               mons, pylds := monitoring(plan, store, 
c.runnerCapabilities[URNMonitoringInfoShortID])
 
                return &fnpb.InstructionResponse{
                        InstructionId: string(instID),
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go 
b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index 76557c32f4f..ef6fbc1d1fc 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -102,11 +102,10 @@ func shortIdsToInfos(shortids []string) 
map[string]*pipepb.MonitoringInfo {
        return defaultShortIDCache.shortIdsToInfos(shortids)
 }
 
-func monitoring(p *exec.Plan, store *metrics.Store) ([]*pipepb.MonitoringInfo, 
map[string][]byte) {
+func monitoring(p *exec.Plan, store *metrics.Store, supportShortID bool) 
([]*pipepb.MonitoringInfo, map[string][]byte) {
        if store == nil {
                return nil, nil
        }
-
        defaultShortIDCache.mu.Lock()
        defer defaultShortIDCache.mu.Unlock()
 
@@ -119,14 +118,15 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
([]*pipepb.MonitoringInfo, m
                                panic(err)
                        }
                        payloads[getShortID(l, metricsx.UrnUserSumInt64)] = 
payload
-
-                       monitoringInfo = append(monitoringInfo,
-                               &pipepb.MonitoringInfo{
-                                       Urn:     
metricsx.UrnToString(metricsx.UrnUserSumInt64),
-                                       Type:    
metricsx.UrnToType(metricsx.UrnUserSumInt64),
-                                       Labels:  l.Map(),
-                                       Payload: payload,
-                               })
+                       if !supportShortID {
+                               monitoringInfo = append(monitoringInfo,
+                                       &pipepb.MonitoringInfo{
+                                               Urn:     
metricsx.UrnToString(metricsx.UrnUserSumInt64),
+                                               Type:    
metricsx.UrnToType(metricsx.UrnUserSumInt64),
+                                               Labels:  l.Map(),
+                                               Payload: payload,
+                                       })
+                       }
                },
                DistributionInt64: func(l metrics.Labels, count, sum, min, max 
int64) {
                        payload, err := metricsx.Int64Distribution(count, sum, 
min, max)
@@ -134,14 +134,15 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
([]*pipepb.MonitoringInfo, m
                                panic(err)
                        }
                        payloads[getShortID(l, metricsx.UrnUserDistInt64)] = 
payload
-
-                       monitoringInfo = append(monitoringInfo,
-                               &pipepb.MonitoringInfo{
-                                       Urn:     
metricsx.UrnToString(metricsx.UrnUserDistInt64),
-                                       Type:    
metricsx.UrnToType(metricsx.UrnUserDistInt64),
-                                       Labels:  l.Map(),
-                                       Payload: payload,
-                               })
+                       if !supportShortID {
+                               monitoringInfo = append(monitoringInfo,
+                                       &pipepb.MonitoringInfo{
+                                               Urn:     
metricsx.UrnToString(metricsx.UrnUserDistInt64),
+                                               Type:    
metricsx.UrnToType(metricsx.UrnUserDistInt64),
+                                               Labels:  l.Map(),
+                                               Payload: payload,
+                                       })
+                       }
                },
                GaugeInt64: func(l metrics.Labels, v int64, t time.Time) {
                        payload, err := metricsx.Int64Latest(t, v)
@@ -149,15 +150,15 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
([]*pipepb.MonitoringInfo, m
                                panic(err)
                        }
                        payloads[getShortID(l, metricsx.UrnUserLatestMsInt64)] 
= payload
-
-                       monitoringInfo = append(monitoringInfo,
-                               &pipepb.MonitoringInfo{
-                                       Urn:     
metricsx.UrnToString(metricsx.UrnUserLatestMsInt64),
-                                       Type:    
metricsx.UrnToType(metricsx.UrnUserLatestMsInt64),
-                                       Labels:  l.Map(),
-                                       Payload: payload,
-                               })
-
+                       if !supportShortID {
+                               monitoringInfo = append(monitoringInfo,
+                                       &pipepb.MonitoringInfo{
+                                               Urn:     
metricsx.UrnToString(metricsx.UrnUserLatestMsInt64),
+                                               Type:    
metricsx.UrnToType(metricsx.UrnUserLatestMsInt64),
+                                               Labels:  l.Map(),
+                                               Payload: payload,
+                                       })
+                       }
                },
                MsecsInt64: func(l string, states *[4]metrics.ExecutionState) {
                        label := map[string]string{"PTRANSFORM": l}
@@ -168,13 +169,15 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
([]*pipepb.MonitoringInfo, m
                                }
                                ul := metricsx.ExecutionMsecUrn(i)
                                
payloads[getShortID(metrics.PTransformLabels(l), ul)] = payload
-                               monitoringInfo = append(monitoringInfo,
-                                       &pipepb.MonitoringInfo{
-                                               Urn:     
metricsx.UrnToString(ul),
-                                               Type:    metricsx.UrnToType(ul),
-                                               Labels:  label,
-                                               Payload: payload,
-                                       })
+                               if !supportShortID {
+                                       monitoringInfo = append(monitoringInfo,
+                                               &pipepb.MonitoringInfo{
+                                                       Urn:     
metricsx.UrnToString(ul),
+                                                       Type:    
metricsx.UrnToType(ul),
+                                                       Labels:  label,
+                                                       Payload: payload,
+                                               })
+                               }
                        }
                },
        }.ExtractFrom(store)
@@ -194,16 +197,17 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
([]*pipepb.MonitoringInfo, m
                // TODO(BEAM-9934): This metric should account for elements in 
multiple windows.
                payloads[getShortID(metrics.PCollectionLabels(pcol.ID), 
metricsx.UrnElementCount)] = payload
 
-               monitoringInfo = append(monitoringInfo,
-                       &pipepb.MonitoringInfo{
-                               Urn:  
metricsx.UrnToString(metricsx.UrnElementCount),
-                               Type: 
metricsx.UrnToType(metricsx.UrnElementCount),
-                               Labels: map[string]string{
-                                       "PCOLLECTION": pcol.ID,
-                               },
-                               Payload: payload,
-                       })
-
+               if !supportShortID {
+                       monitoringInfo = append(monitoringInfo,
+                               &pipepb.MonitoringInfo{
+                                       Urn:  
metricsx.UrnToString(metricsx.UrnElementCount),
+                                       Type: 
metricsx.UrnToType(metricsx.UrnElementCount),
+                                       Labels: map[string]string{
+                                               "PCOLLECTION": pcol.ID,
+                                       },
+                                       Payload: payload,
+                               })
+               }
                // Skip pcollections without size
                if pcol.SizeCount != 0 {
                        payload, err := 
metricsx.Int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, 
pcol.SizeMax)
@@ -212,15 +216,17 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
([]*pipepb.MonitoringInfo, m
                        }
                        payloads[getShortID(metrics.PCollectionLabels(pcol.ID), 
metricsx.UrnSampledByteSize)] = payload
 
-                       monitoringInfo = append(monitoringInfo,
-                               &pipepb.MonitoringInfo{
-                                       Urn:  
metricsx.UrnToString(metricsx.UrnSampledByteSize),
-                                       Type: 
metricsx.UrnToType(metricsx.UrnSampledByteSize),
-                                       Labels: map[string]string{
-                                               "PCOLLECTION": pcol.ID,
-                                       },
-                                       Payload: payload,
-                               })
+                       if !supportShortID {
+                               monitoringInfo = append(monitoringInfo,
+                                       &pipepb.MonitoringInfo{
+                                               Urn:  
metricsx.UrnToString(metricsx.UrnSampledByteSize),
+                                               Type: 
metricsx.UrnToType(metricsx.UrnSampledByteSize),
+                                               Labels: map[string]string{
+                                                       "PCOLLECTION": pcol.ID,
+                                               },
+                                               Payload: payload,
+                                       })
+                       }
                }
        }
 
@@ -230,15 +236,16 @@ func monitoring(p *exec.Plan, store *metrics.Store) 
([]*pipepb.MonitoringInfo, m
        }
 
        payloads[getShortID(metrics.PTransformLabels(snapshot.Source.ID), 
metricsx.UrnDataChannelReadIndex)] = payload
-       monitoringInfo = append(monitoringInfo,
-               &pipepb.MonitoringInfo{
-                       Urn:  
metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
-                       Type: 
metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
-                       Labels: map[string]string{
-                               "PTRANSFORM": snapshot.Source.ID,
-                       },
-                       Payload: payload,
-               })
-
+       if !supportShortID {
+               monitoringInfo = append(monitoringInfo,
+                       &pipepb.MonitoringInfo{
+                               Urn:  
metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
+                               Type: 
metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
+                               Labels: map[string]string{
+                                       "PTRANSFORM": snapshot.Source.ID,
+                               },
+                               Payload: payload,
+                       })
+       }
        return monitoringInfo, payloads
 }

Reply via email to