This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new baae15a  [BEAM-11207] Metric Extraction via proto RPC API (#13272)
baae15a is described below

commit baae15a6e85033b7d32a38daf12bb03a7cc7fe74
Author: Kamil Wasilewski <kamil.wasilew...@polidea.com>
AuthorDate: Mon Nov 16 23:27:45 2020 +0100

    [BEAM-11207] Metric Extraction via proto RPC API (#13272)
    
    For the Go SDK.
---
 sdks/go/pkg/beam/core/metrics/metrics.go           | 118 +++++++++++++
 .../go/pkg/beam/core/runtime/harness/monitoring.go | 187 +++------------------
 .../beam/core/runtime/harness/monitoring_test.go   |  33 ++--
 sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go | 172 +++++++++++++++++++
 .../beam/core/runtime/metricsx/metricsx_test.go    | 166 ++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/metricsx/urns.go     | 170 +++++++++++++++++++
 sdks/go/pkg/beam/doc_test.go                       |   2 +-
 sdks/go/pkg/beam/io/textio/sdf_test.go             |   2 +-
 sdks/go/pkg/beam/pipeline.go                       |   6 +
 sdks/go/pkg/beam/runner.go                         |   6 +-
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      |  24 +--
 sdks/go/pkg/beam/runners/direct/direct.go          |  16 +-
 sdks/go/pkg/beam/runners/dot/dot.go                |  10 +-
 sdks/go/pkg/beam/runners/flink/flink.go            |   2 +-
 sdks/go/pkg/beam/runners/spark/spark.go            |   2 +-
 .../beam/runners/universal/runnerlib/execute.go    |  48 +++++-
 sdks/go/pkg/beam/runners/universal/universal.go    |  20 +--
 sdks/go/pkg/beam/runners/vet/vet.go                |   8 +-
 sdks/go/pkg/beam/testing/ptest/ptest.go            |   3 +-
 sdks/go/pkg/beam/x/beamx/run.go                    |   8 +
 20 files changed, 770 insertions(+), 233 deletions(-)

diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go 
b/sdks/go/pkg/beam/core/metrics/metrics.go
index 5b14c4a..33b8ef2 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -381,6 +381,11 @@ func (m *distribution) get() (count, sum, min, max int64) {
        return m.count, m.sum, m.min, m.max
 }
 
+// DistributionValue is the value of a Distribution metric.
+type DistributionValue struct {
+       Count, Sum, Min, Max int64
+}
+
 // Gauge is a time, value pair metric.
 type Gauge struct {
        name name
@@ -448,3 +453,116 @@ func (m *gauge) get() (int64, time.Time) {
        defer m.mu.Unlock()
        return m.v, m.t
 }
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+       Value     int64
+       Timestamp time.Time
+}
+
+// Results represents all metrics gathered during the job's execution.
+// It allows for querying metrics using a provided filter.
+type Results struct {
+       counters      []CounterResult
+       distributions []DistributionResult
+       gauges        []GaugeResult
+}
+
+// NewResults creates a new Results.
+func NewResults(
+       counters []CounterResult,
+       distributions []DistributionResult,
+       gauges []GaugeResult) *Results {
+       return &Results{counters, distributions, gauges}
+}
+
+// AllMetrics returns all metrics from a Results instance.
+func (mr Results) AllMetrics() QueryResults {
+       return QueryResults{mr.counters, mr.distributions, mr.gauges}
+}
+
+// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering
+
+// QueryResults is the result of a query. Allows accessing all of the
+// metrics that matched the filter.
+type QueryResults struct {
+       counters      []CounterResult
+       distributions []DistributionResult
+       gauges        []GaugeResult
+}
+
+// Counters returns a slice of counter metrics.
+func (qr QueryResults) Counters() []CounterResult {
+       out := make([]CounterResult, len(qr.counters))
+       copy(out, qr.counters)
+       return out
+}
+
+// Distributions returns a slice of distribution metrics.
+func (qr QueryResults) Distributions() []DistributionResult {
+       out := make([]DistributionResult, len(qr.distributions))
+       copy(out, qr.distributions)
+       return out
+}
+
+// Gauges returns a slice of gauge metrics.
+func (qr QueryResults) Gauges() []GaugeResult {
+       out := make([]GaugeResult, len(qr.gauges))
+       copy(out, qr.gauges)
+       return out
+}
+
+// CounterResult is an attempted and a commited value of a counter metric plus
+// key.
+type CounterResult struct {
+       Attempted, Committed int64
+       Key                  StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if 
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r CounterResult) Result() int64 {
+       if r.Committed != 0 {
+               return r.Committed
+       }
+       return r.Attempted
+}
+
+// DistributionResult is an attempted and a commited value of a distribution
+// metric plus key.
+type DistributionResult struct {
+       Attempted, Committed DistributionValue
+       Key                  StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if 
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r DistributionResult) Result() DistributionValue {
+       empty := DistributionValue{}
+       if r.Committed != empty {
+               return r.Committed
+       }
+       return r.Attempted
+}
+
+// GaugeResult is an attempted and a commited value of a gauge metric plus
+// key.
+type GaugeResult struct {
+       Attempted, Committed GaugeValue
+       Key                  StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if 
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r GaugeResult) Result() GaugeValue {
+       empty := GaugeValue{}
+       if r.Committed != empty {
+               return r.Committed
+       }
+       return r.Attempted
+}
+
+// StepKey uniquely identifies a metric within a pipeline graph.
+type StepKey struct {
+       Step, Name, Namespace string
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go 
b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index 53419ce..c9ddb80 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -16,121 +16,20 @@
 package harness
 
 import (
-       "bytes"
        "strconv"
        "sync"
        "sync/atomic"
        "time"
 
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
-type mUrn uint32
-
-// TODO: Pull these from the protos.
-var sUrns = [...]string{
-       "beam:metric:user:sum_int64:v1",
-       "beam:metric:user:sum_double:v1",
-       "beam:metric:user:distribution_int64:v1",
-       "beam:metric:user:distribution_double:v1",
-       "beam:metric:user:latest_int64:v1",
-       "beam:metric:user:latest_double:v1",
-       "beam:metric:user:top_n_int64:v1",
-       "beam:metric:user:top_n_double:v1",
-       "beam:metric:user:bottom_n_int64:v1",
-       "beam:metric:user:bottom_n_double:v1",
-
-       "beam:metric:element_count:v1",
-       "beam:metric:sampled_byte_size:v1",
-
-       "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
-       "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
-       "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
-       "beam:metric:ptransform_execution_time:total_msecs:v1",
-
-       "beam:metric:ptransform_progress:remaining:v1",
-       "beam:metric:ptransform_progress:completed:v1",
-       "beam:metric:data_channel:read_index:v1",
-
-       "TestingSentinelUrn", // Must remain last.
-}
-
-const (
-       urnUserSumInt64 mUrn = iota
-       urnUserSumFloat64
-       urnUserDistInt64
-       urnUserDistFloat64
-       urnUserLatestMsInt64
-       urnUserLatestMsFloat64
-       urnUserTopNInt64
-       urnUserTopNFloat64
-       urnUserBottomNInt64
-       urnUserBottomNFloat64
-
-       urnElementCount
-       urnSampledByteSize
-
-       urnStartBundle
-       urnProcessBundle
-       urnFinishBundle
-       urnTransformTotalTime
-
-       urnProgressRemaining
-       urnProgressCompleted
-       urnDataChannelReadIndex
-
-       urnTestSentinel // Must remain last.
-)
-
-// urnToType maps the urn to it's encoding type.
-// This function is written to be inlinable by the compiler.
-func urnToType(u mUrn) string {
-       switch u {
-       case urnUserSumInt64, urnElementCount, urnStartBundle, 
urnProcessBundle, urnFinishBundle, urnTransformTotalTime:
-               return "beam:metrics:sum_int64:v1"
-       case urnUserSumFloat64:
-               return "beam:metrics:sum_double:v1"
-       case urnUserDistInt64, urnSampledByteSize:
-               return "beam:metrics:distribution_int64:v1"
-       case urnUserDistFloat64:
-               return "beam:metrics:distribution_double:v1"
-       case urnUserLatestMsInt64:
-               return "beam:metrics:latest_int64:v1"
-       case urnUserLatestMsFloat64:
-               return "beam:metrics:latest_double:v1"
-       case urnUserTopNInt64:
-               return "beam:metrics:top_n_int64:v1"
-       case urnUserTopNFloat64:
-               return "beam:metrics:top_n_double:v1"
-       case urnUserBottomNInt64:
-               return "beam:metrics:bottom_n_int64:v1"
-       case urnUserBottomNFloat64:
-               return "beam:metrics:bottom_n_double:v1"
-
-       case urnProgressRemaining, urnProgressCompleted:
-               return "beam:metrics:progress:v1"
-       case urnDataChannelReadIndex:
-               return "beam:metrics:sum_int64:v1"
-
-       // Monitoring Table isn't currently in the protos.
-       // case ???:
-       //      return "beam:metrics:monitoring_table:v1"
-
-       case urnTestSentinel:
-               return "TestingSentinelType"
-
-       default:
-               panic("metric urn without specified type" + sUrns[u])
-       }
-}
-
 type shortKey struct {
        metrics.Labels
-       Urn mUrn // Urns fully specify their type.
+       Urn metricsx.Urn // Urns fully specify their type.
 }
 
 // shortIDCache retains lookup caches for short ids to the full monitoring
@@ -162,7 +61,7 @@ func (c *shortIDCache) getNextShortID() string {
 // getShortID returns the short id for the given metric, and if
 // it doesn't exist yet, stores the metadata.
 // Assumes c.mu lock is held.
-func (c *shortIDCache) getShortID(l metrics.Labels, urn mUrn) string {
+func (c *shortIDCache) getShortID(l metrics.Labels, urn metricsx.Urn) string {
        k := shortKey{l, urn}
        s, ok := c.labels2ShortIds[k]
        if ok {
@@ -171,8 +70,8 @@ func (c *shortIDCache) getShortID(l metrics.Labels, urn 
mUrn) string {
        s = c.getNextShortID()
        c.labels2ShortIds[k] = s
        c.shortIds2Infos[s] = &pipepb.MonitoringInfo{
-               Urn:    sUrns[urn],
-               Type:   urnToType(urn),
+               Urn:    metricsx.UrnToString(urn),
+               Type:   metricsx.UrnToType(urn),
                Labels: userLabels(l),
        }
        return s
@@ -195,7 +94,7 @@ func init() {
        defaultShortIDCache = newShortIDCache()
 }
 
-func getShortID(l metrics.Labels, urn mUrn) string {
+func getShortID(l metrics.Labels, urn metricsx.Urn) string {
        return defaultShortIDCache.getShortID(l, urn)
 }
 
@@ -216,46 +115,46 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, 
map[string][]byte) {
        payloads := make(map[string][]byte)
        metrics.Extractor{
                SumInt64: func(l metrics.Labels, v int64) {
-                       payload, err := int64Counter(v)
+                       payload, err := metricsx.Int64Counter(v)
                        if err != nil {
                                panic(err)
                        }
-                       payloads[getShortID(l, urnUserSumInt64)] = payload
+                       payloads[getShortID(l, metricsx.UrnUserSumInt64)] = 
payload
 
                        monitoringInfo = append(monitoringInfo,
                                &pipepb.MonitoringInfo{
-                                       Urn:     sUrns[urnUserSumInt64],
-                                       Type:    urnToType(urnUserSumInt64),
+                                       Urn:     
metricsx.UrnToString(metricsx.UrnUserSumInt64),
+                                       Type:    
metricsx.UrnToType(metricsx.UrnUserSumInt64),
                                        Labels:  userLabels(l),
                                        Payload: payload,
                                })
                },
                DistributionInt64: func(l metrics.Labels, count, sum, min, max 
int64) {
-                       payload, err := int64Distribution(count, sum, min, max)
+                       payload, err := metricsx.Int64Distribution(count, sum, 
min, max)
                        if err != nil {
                                panic(err)
                        }
-                       payloads[getShortID(l, urnUserDistInt64)] = payload
+                       payloads[getShortID(l, metricsx.UrnUserDistInt64)] = 
payload
 
                        monitoringInfo = append(monitoringInfo,
                                &pipepb.MonitoringInfo{
-                                       Urn:     sUrns[urnUserDistInt64],
-                                       Type:    urnToType(urnUserDistInt64),
+                                       Urn:     
metricsx.UrnToString(metricsx.UrnUserDistInt64),
+                                       Type:    
metricsx.UrnToType(metricsx.UrnUserDistInt64),
                                        Labels:  userLabels(l),
                                        Payload: payload,
                                })
                },
                GaugeInt64: func(l metrics.Labels, v int64, t time.Time) {
-                       payload, err := int64Latest(t, v)
+                       payload, err := metricsx.Int64Latest(t, v)
                        if err != nil {
                                panic(err)
                        }
-                       payloads[getShortID(l, urnUserLatestMsInt64)] = payload
+                       payloads[getShortID(l, metricsx.UrnUserLatestMsInt64)] 
= payload
 
                        monitoringInfo = append(monitoringInfo,
                                &pipepb.MonitoringInfo{
-                                       Urn:     sUrns[urnUserLatestMsInt64],
-                                       Type:    
urnToType(urnUserLatestMsInt64),
+                                       Urn:     
metricsx.UrnToString(metricsx.UrnUserLatestMsInt64),
+                                       Type:    
metricsx.UrnToType(metricsx.UrnUserLatestMsInt64),
                                        Labels:  userLabels(l),
                                        Payload: payload,
                                })
@@ -265,28 +164,28 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, 
map[string][]byte) {
 
        // Get the execution monitoring information from the bundle plan.
        if snapshot, ok := p.Progress(); ok {
-               payload, err := int64Counter(snapshot.Count)
+               payload, err := metricsx.Int64Counter(snapshot.Count)
                if err != nil {
                        panic(err)
                }
 
                // TODO(BEAM-9934): This metric should account for elements in 
multiple windows.
-               payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), 
urnElementCount)] = payload
+               payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), 
metricsx.UrnElementCount)] = payload
                monitoringInfo = append(monitoringInfo,
                        &pipepb.MonitoringInfo{
-                               Urn:  sUrns[urnElementCount],
-                               Type: urnToType(urnElementCount),
+                               Urn:  
metricsx.UrnToString(metricsx.UrnElementCount),
+                               Type: 
metricsx.UrnToType(metricsx.UrnElementCount),
                                Labels: map[string]string{
                                        "PCOLLECTION": snapshot.PID,
                                },
                                Payload: payload,
                        })
 
-               payloads[getShortID(metrics.PTransformLabels(snapshot.ID), 
urnDataChannelReadIndex)] = payload
+               payloads[getShortID(metrics.PTransformLabels(snapshot.ID), 
metricsx.UrnDataChannelReadIndex)] = payload
                monitoringInfo = append(monitoringInfo,
                        &pipepb.MonitoringInfo{
-                               Urn:  sUrns[urnDataChannelReadIndex],
-                               Type: urnToType(urnDataChannelReadIndex),
+                               Urn:  
metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
+                               Type: 
metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
                                Labels: map[string]string{
                                        "PTRANSFORM": snapshot.ID,
                                },
@@ -305,39 +204,3 @@ func userLabels(l metrics.Labels) map[string]string {
                "NAME":       l.Name(),
        }
 }
-
-func int64Counter(v int64) ([]byte, error) {
-       var buf bytes.Buffer
-       if err := coder.EncodeVarInt(v, &buf); err != nil {
-               return nil, err
-       }
-       return buf.Bytes(), nil
-}
-
-func int64Latest(t time.Time, v int64) ([]byte, error) {
-       var buf bytes.Buffer
-       if err := coder.EncodeVarInt(mtime.FromTime(t).Milliseconds(), &buf); 
err != nil {
-               return nil, err
-       }
-       if err := coder.EncodeVarInt(v, &buf); err != nil {
-               return nil, err
-       }
-       return buf.Bytes(), nil
-}
-
-func int64Distribution(count, sum, min, max int64) ([]byte, error) {
-       var buf bytes.Buffer
-       if err := coder.EncodeVarInt(count, &buf); err != nil {
-               return nil, err
-       }
-       if err := coder.EncodeVarInt(sum, &buf); err != nil {
-               return nil, err
-       }
-       if err := coder.EncodeVarInt(min, &buf); err != nil {
-               return nil, err
-       }
-       if err := coder.EncodeVarInt(max, &buf); err != nil {
-               return nil, err
-       }
-       return buf.Bytes(), nil
-}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go
index 456fce4..ed792dd 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go
@@ -20,34 +20,35 @@ import (
        "testing"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx"
 )
 
 func TestGetShortID(t *testing.T) {
        tests := []struct {
                id           string
-               urn          mUrn
+               urn          metricsx.Urn
                labels       metrics.Labels
                expectedUrn  string
                expectedType string
        }{
                {
                        id:           "1",
-                       urn:          urnUserDistInt64,
+                       urn:          metricsx.UrnUserDistInt64,
                        expectedUrn:  "beam:metric:user:distribution_int64:v1",
                        expectedType: "beam:metrics:distribution_int64:v1",
                }, {
                        id:           "2",
-                       urn:          urnElementCount,
+                       urn:          metricsx.UrnElementCount,
                        expectedUrn:  "beam:metric:element_count:v1",
                        expectedType: "beam:metrics:sum_int64:v1",
                }, {
                        id:           "3",
-                       urn:          urnProgressCompleted,
+                       urn:          metricsx.UrnProgressCompleted,
                        expectedUrn:  
"beam:metric:ptransform_progress:completed:v1",
                        expectedType: "beam:metrics:progress:v1",
                }, {
                        id:           "4",
-                       urn:          urnUserDistFloat64,
+                       urn:          metricsx.UrnUserDistFloat64,
                        expectedUrn:  "beam:metric:user:distribution_double:v1",
                        expectedType: "beam:metrics:distribution_double:v1",
                }, {
@@ -56,25 +57,25 @@ func TestGetShortID(t *testing.T) {
                        // in the list, or vice versa, this should fail with 
either
                        // an index out of range panic or a mismatch.
                        id:           "5",
-                       urn:          urnTestSentinel,
+                       urn:          metricsx.UrnTestSentinel,
                        expectedUrn:  "TestingSentinelUrn",
                        expectedType: "TestingSentinelType",
                }, {
                        id:           "6",
-                       urn:          urnFinishBundle,
+                       urn:          metricsx.UrnFinishBundle,
                        expectedUrn:  
"beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
                        expectedType: "beam:metrics:sum_int64:v1",
                }, {
                        // This case and the next one validates that different 
labels
                        // with the same urn are in fact assigned different 
short ids.
                        id:           "7",
-                       urn:          urnUserSumInt64,
+                       urn:          metricsx.UrnUserSumInt64,
                        labels:       metrics.UserLabels("myT", "harness", 
"metricNumber7"),
                        expectedUrn:  "beam:metric:user:sum_int64:v1",
                        expectedType: "beam:metrics:sum_int64:v1",
                }, {
                        id:           "8",
-                       urn:          urnUserSumInt64,
+                       urn:          metricsx.UrnUserSumInt64,
                        labels:       metrics.UserLabels("myT", "harness", 
"metricNumber8"),
                        expectedUrn:  "beam:metric:user:sum_int64:v1",
                        expectedType: "beam:metrics:sum_int64:v1",
@@ -84,13 +85,13 @@ func TestGetShortID(t *testing.T) {
                        // user metrics are unique per label set, but this 
isn't the layer
                        // to validate that condition.
                        id:           "9",
-                       urn:          urnUserTopNFloat64,
+                       urn:          metricsx.UrnUserTopNFloat64,
                        labels:       metrics.UserLabels("myT", "harness", 
"metricNumber7"),
                        expectedUrn:  "beam:metric:user:top_n_double:v1",
                        expectedType: "beam:metrics:top_n_double:v1",
                }, {
                        id:           "a",
-                       urn:          urnElementCount,
+                       urn:          metricsx.UrnElementCount,
                        labels:       metrics.PCollectionLabels("myPCol"),
                        expectedUrn:  "beam:metric:element_count:v1",
                        expectedType: "beam:metrics:sum_int64:v1",
@@ -133,7 +134,7 @@ func TestGetShortID(t *testing.T) {
 // is initialized properly.
 func TestShortIdCache_Default(t *testing.T) {
        defaultShortIDCache.mu.Lock()
-       s := getShortID(metrics.UserLabels("this", "doesn't", "matter"), 
urnTestSentinel)
+       s := getShortID(metrics.UserLabels("this", "doesn't", "matter"), 
metricsx.UrnTestSentinel)
        defaultShortIDCache.mu.Unlock()
 
        info := shortIdsToInfos([]string{s})[s]
@@ -148,11 +149,11 @@ func TestShortIdCache_Default(t *testing.T) {
 func BenchmarkGetShortID(b *testing.B) {
        b.Run("new", func(b *testing.B) {
                l := metrics.UserLabels("this", "doesn't", 
strconv.FormatInt(-1, 36))
-               last := getShortID(l, urnTestSentinel)
+               last := getShortID(l, metricsx.UrnTestSentinel)
                for i := int64(0); i < int64(b.N); i++ {
                        // Ensure it's allocated to the stack.
                        l = metrics.UserLabels("this", "doesn't", 
strconv.FormatInt(i, 36))
-                       got := getShortID(l, urnTestSentinel)
+                       got := getShortID(l, metricsx.UrnTestSentinel)
                        if got == last {
                                b.Fatalf("short collision: at %s", got)
                        }
@@ -162,9 +163,9 @@ func BenchmarkGetShortID(b *testing.B) {
        b.Run("amortized", func(b *testing.B) {
                l := metrics.UserLabels("this", "doesn't", "matter")
                c := newShortIDCache()
-               want := c.getShortID(l, urnTestSentinel)
+               want := c.getShortID(l, metricsx.UrnTestSentinel)
                for i := 0; i < b.N; i++ {
-                       got := c.getShortID(l, urnTestSentinel)
+                       got := c.getShortID(l, metricsx.UrnTestSentinel)
                        if got != want {
                                b.Fatalf("different short ids: got %s, want 
%s", got, want)
                        }
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go 
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
new file mode 100644
index 0000000..6cd10b4
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metricsx
+
+import (
+       "bytes"
+       "fmt"
+       "log"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+// FromMonitoringInfos extracts metrics from monitored states and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) *metrics.Results {
+       ac, ad, ag := groupByType(attempted)
+       cc, cd, cg := groupByType(committed)
+
+       return metrics.NewResults(mergeCounters(ac, cc), mergeDistributions(ad, 
cd), mergeGauges(ag, cg))
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+       map[metrics.StepKey]int64,
+       map[metrics.StepKey]metrics.DistributionValue,
+       map[metrics.StepKey]metrics.GaugeValue) {
+       counters := make(map[metrics.StepKey]int64)
+       distributions := make(map[metrics.StepKey]metrics.DistributionValue)
+       gauges := make(map[metrics.StepKey]metrics.GaugeValue)
+
+       for _, minfo := range minfos {
+               key, err := extractKey(minfo)
+               if err != nil {
+                       log.Println(err)
+                       continue
+               }
+
+               r := bytes.NewReader(minfo.GetPayload())
+
+               switch minfo.GetType() {
+               case "beam:metrics:sum_int64:v1":
+                       value, err := extractCounterValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       counters[key] = value
+               case "beam:metrics:distribution_int64:v1":
+                       value, err := extractDistributionValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       distributions[key] = value
+               case
+                       "beam:metrics:latest_int64:v1",
+                       "beam:metrics:top_n_int64:v1",
+                       "beam:metrics:bottom_n_int64:v1":
+                       value, err := extractGaugeValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       gauges[key] = value
+               default:
+                       log.Println("unknown metric type")
+               }
+       }
+       return counters, distributions, gauges
+}
+
+func mergeCounters(
+       attempted map[metrics.StepKey]int64,
+       committed map[metrics.StepKey]int64) []metrics.CounterResult {
+       res := make([]metrics.CounterResult, 0)
+
+       for k := range attempted {
+               v := committed[k]
+               res = append(res, metrics.CounterResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeDistributions(
+       attempted map[metrics.StepKey]metrics.DistributionValue,
+       committed map[metrics.StepKey]metrics.DistributionValue) 
[]metrics.DistributionResult {
+       res := make([]metrics.DistributionResult, 0)
+
+       for k := range attempted {
+               v := committed[k]
+               res = append(res, metrics.DistributionResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeGauges(
+       attempted map[metrics.StepKey]metrics.GaugeValue,
+       committed map[metrics.StepKey]metrics.GaugeValue) []metrics.GaugeResult 
{
+       res := make([]metrics.GaugeResult, 0)
+
+       for k := range attempted {
+               v := committed[k]
+               res = append(res, metrics.GaugeResult{Attempted: attempted[k], 
Committed: v, Key: k})
+       }
+       return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.StepKey, error) {
+       labels := newLabels(mi.GetLabels())
+       stepName := labels.Transform()
+       if stepName == "" {
+               return metrics.StepKey{}, fmt.Errorf("Failed to deduce Step 
from MonitoringInfo: %v", mi)
+       }
+       return metrics.StepKey{Step: stepName, Name: labels.Name(), Namespace: 
labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+       value, err := coder.DecodeVarInt(reader)
+       if err != nil {
+               return -1, err
+       }
+       return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader) 
(metrics.DistributionValue, error) {
+       values, err := decodeMany(reader, 4)
+       if err != nil {
+               return metrics.DistributionValue{}, err
+       }
+       return metrics.DistributionValue{Count: values[0], Sum: values[1], Min: 
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+       values, err := decodeMany(reader, 2)
+       if err != nil {
+               return metrics.GaugeValue{}, err
+       }
+       return metrics.GaugeValue{Timestamp: time.Unix(0, 
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+       labels := metrics.UserLabels(miLabels["PTRANSFORM"], 
miLabels["NAMESPACE"], miLabels["NAME"])
+       return &labels
+}
+
+func decodeMany(reader *bytes.Reader, size int) ([]int64, error) {
+       var err error
+       values := make([]int64, size)
+
+       for i := 0; i < size; i++ {
+               values[i], err = coder.DecodeVarInt(reader)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return values, err
+}
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go 
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
new file mode 100644
index 0000000..83add87
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metricsx
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "github.com/google/go-cmp/cmp"
+)
+
+func TestFromMonitoringInfos_Counters(t *testing.T) {
+       var value int64 = 15
+       want := metrics.CounterResult{
+               Attempted: 15,
+               Committed: 0,
+               Key: metrics.StepKey{
+                       Step:      "main.customDoFn",
+                       Name:      "customCounter",
+                       Namespace: "customDoFn",
+               }}
+
+       payload, err := Int64Counter(value)
+       if err != nil {
+               t.Fatalf("Failed to encode Int64Counter: %v", err)
+       }
+
+       labels := map[string]string{
+               "PTRANSFORM": "main.customDoFn",
+               "NAMESPACE":  "customDoFn",
+               "NAME":       "customCounter",
+       }
+
+       mInfo := &pipepb.MonitoringInfo{
+               Urn:     UrnToString(UrnUserSumInt64),
+               Type:    UrnToType(UrnUserSumInt64),
+               Labels:  labels,
+               Payload: payload,
+       }
+
+       attempted := []*pipepb.MonitoringInfo{mInfo}
+       committed := []*pipepb.MonitoringInfo{}
+
+       got := FromMonitoringInfos(attempted, committed).AllMetrics().Counters()
+       size := len(got)
+       if size < 1 {
+               t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+       }
+       if d := cmp.Diff(want, got[0]); d != "" {
+               t.Fatalf("Invalid counter: got: %v, want: %v, 
diff(-want,+got):\n %v",
+                       got[0], want, d)
+       }
+}
+
+func TestFromMonitoringInfos_Distributions(t *testing.T) {
+       var count, sum, min, max int64 = 100, 5, -12, 30
+
+       want := metrics.DistributionResult{
+               Attempted: metrics.DistributionValue{
+                       Count: 100,
+                       Sum:   5,
+                       Min:   -12,
+                       Max:   30,
+               },
+               Committed: metrics.DistributionValue{},
+               Key: metrics.StepKey{
+                       Step:      "main.customDoFn",
+                       Name:      "customDist",
+                       Namespace: "customDoFn",
+               }}
+
+       payload, err := Int64Distribution(count, sum, min, max)
+       if err != nil {
+               t.Fatalf("Failed to encode Int64Distribution: %v", err)
+       }
+
+       labels := map[string]string{
+               "PTRANSFORM": "main.customDoFn",
+               "NAMESPACE":  "customDoFn",
+               "NAME":       "customDist",
+       }
+
+       mInfo := &pipepb.MonitoringInfo{
+               Urn:     UrnToString(UrnUserDistInt64),
+               Type:    UrnToType(UrnUserDistInt64),
+               Labels:  labels,
+               Payload: payload,
+       }
+
+       attempted := []*pipepb.MonitoringInfo{mInfo}
+       committed := []*pipepb.MonitoringInfo{}
+
+       got := FromMonitoringInfos(attempted, 
committed).AllMetrics().Distributions()
+       size := len(got)
+       if size < 1 {
+               t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+       }
+       if d := cmp.Diff(want, got[0]); d != "" {
+               t.Fatalf("Invalid distribution: got: %v, want: %v, 
diff(-want,+got):\n %v",
+                       got[0], want, d)
+       }
+}
+
+func TestFromMonitoringInfos_Gauges(t *testing.T) {
+       var value int64 = 100
+       loc, _ := time.LoadLocation("Local")
+       tm := time.Date(2020, 11, 9, 17, 52, 28, 462*int(time.Millisecond), loc)
+
+       want := metrics.GaugeResult{
+               Attempted: metrics.GaugeValue{
+                       Value:     100,
+                       Timestamp: tm,
+               },
+               Committed: metrics.GaugeValue{},
+               Key: metrics.StepKey{
+                       Step:      "main.customDoFn",
+                       Name:      "customGauge",
+                       Namespace: "customDoFn",
+               }}
+
+       payload, err := Int64Latest(tm, value)
+       if err != nil {
+               t.Fatalf("Failed to encode Int64Latest: %v", err)
+       }
+
+       labels := map[string]string{
+               "PTRANSFORM": "main.customDoFn",
+               "NAMESPACE":  "customDoFn",
+               "NAME":       "customGauge",
+       }
+
+       mInfo := &pipepb.MonitoringInfo{
+               Urn:     UrnToString(UrnUserLatestMsInt64),
+               Type:    UrnToType(UrnUserLatestMsInt64),
+               Labels:  labels,
+               Payload: payload,
+       }
+
+       attempted := []*pipepb.MonitoringInfo{mInfo}
+       committed := []*pipepb.MonitoringInfo{}
+
+       got := FromMonitoringInfos(attempted, committed).AllMetrics().Gauges()
+       size := len(got)
+       if size < 1 {
+               t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+       }
+       if d := cmp.Diff(want, got[0]); d != "" {
+               t.Fatalf("Invalid gauge: got: %v, want: %v, diff(-want,+got):\n 
%v",
+                       got[0], want, d)
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/urns.go 
b/sdks/go/pkg/beam/core/runtime/metricsx/urns.go
new file mode 100644
index 0000000..67aed96
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/urns.go
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metricsx
+
+import (
+       "bytes"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+)
+
+// Urn is an enum type for representing urns of metrics and monitored states.
+type Urn uint32
+
+// TODO: Pull these from the protos.
+var sUrns = [...]string{
+       "beam:metric:user:sum_int64:v1",
+       "beam:metric:user:sum_double:v1",
+       "beam:metric:user:distribution_int64:v1",
+       "beam:metric:user:distribution_double:v1",
+       "beam:metric:user:latest_int64:v1",
+       "beam:metric:user:latest_double:v1",
+       "beam:metric:user:top_n_int64:v1",
+       "beam:metric:user:top_n_double:v1",
+       "beam:metric:user:bottom_n_int64:v1",
+       "beam:metric:user:bottom_n_double:v1",
+
+       "beam:metric:element_count:v1",
+       "beam:metric:sampled_byte_size:v1",
+
+       "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+       "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+       "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+       "beam:metric:ptransform_execution_time:total_msecs:v1",
+
+       "beam:metric:ptransform_progress:remaining:v1",
+       "beam:metric:ptransform_progress:completed:v1",
+       "beam:metric:data_channel:read_index:v1",
+
+       "TestingSentinelUrn", // Must remain last.
+}
+
+// The supported urns of metrics and monitored states.
+const (
+       UrnUserSumInt64 Urn = iota
+       UrnUserSumFloat64
+       UrnUserDistInt64
+       UrnUserDistFloat64
+       UrnUserLatestMsInt64
+       UrnUserLatestMsFloat64
+       UrnUserTopNInt64
+       UrnUserTopNFloat64
+       UrnUserBottomNInt64
+       UrnUserBottomNFloat64
+
+       UrnElementCount
+       UrnSampledByteSize
+
+       UrnStartBundle
+       UrnProcessBundle
+       UrnFinishBundle
+       UrnTransformTotalTime
+
+       UrnProgressRemaining
+       UrnProgressCompleted
+       UrnDataChannelReadIndex
+
+       UrnTestSentinel // Must remain last.
+)
+
+// UrnToString returns a string representation of the urn.
+func UrnToString(u Urn) string {
+       return sUrns[u]
+}
+
+// UrnToType maps the urn to it's encoding type.
+// This function is written to be inlinable by the compiler.
+func UrnToType(u Urn) string {
+       switch u {
+       case UrnUserSumInt64, UrnElementCount, UrnStartBundle, 
UrnProcessBundle, UrnFinishBundle, UrnTransformTotalTime:
+               return "beam:metrics:sum_int64:v1"
+       case UrnUserSumFloat64:
+               return "beam:metrics:sum_double:v1"
+       case UrnUserDistInt64, UrnSampledByteSize:
+               return "beam:metrics:distribution_int64:v1"
+       case UrnUserDistFloat64:
+               return "beam:metrics:distribution_double:v1"
+       case UrnUserLatestMsInt64:
+               return "beam:metrics:latest_int64:v1"
+       case UrnUserLatestMsFloat64:
+               return "beam:metrics:latest_double:v1"
+       case UrnUserTopNInt64:
+               return "beam:metrics:top_n_int64:v1"
+       case UrnUserTopNFloat64:
+               return "beam:metrics:top_n_double:v1"
+       case UrnUserBottomNInt64:
+               return "beam:metrics:bottom_n_int64:v1"
+       case UrnUserBottomNFloat64:
+               return "beam:metrics:bottom_n_double:v1"
+
+       case UrnProgressRemaining, UrnProgressCompleted:
+               return "beam:metrics:progress:v1"
+       case UrnDataChannelReadIndex:
+               return "beam:metrics:sum_int64:v1"
+
+       // Monitoring Table isn't currently in the protos.
+       // case ???:
+       //      return "beam:metrics:monitoring_table:v1"
+
+       case UrnTestSentinel:
+               return "TestingSentinelType"
+
+       default:
+               panic("metric urn without specified type" + sUrns[u])
+       }
+}
+
+// Int64Counter returns an encoded payload of the integer counter.
+func Int64Counter(v int64) ([]byte, error) {
+       var buf bytes.Buffer
+       if err := coder.EncodeVarInt(v, &buf); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
+// Int64Latest returns an encoded payload of the latest seen integer value.
+func Int64Latest(t time.Time, v int64) ([]byte, error) {
+       var buf bytes.Buffer
+       if err := coder.EncodeVarInt(mtime.FromTime(t).Milliseconds(), &buf); 
err != nil {
+               return nil, err
+       }
+       if err := coder.EncodeVarInt(v, &buf); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
+// Int64Distribution returns an encoded payload of the distribution of an
+// integer value.
+func Int64Distribution(count, sum, min, max int64) ([]byte, error) {
+       var buf bytes.Buffer
+       if err := coder.EncodeVarInt(count, &buf); err != nil {
+               return nil, err
+       }
+       if err := coder.EncodeVarInt(sum, &buf); err != nil {
+               return nil, err
+       }
+       if err := coder.EncodeVarInt(min, &buf); err != nil {
+               return nil, err
+       }
+       if err := coder.EncodeVarInt(max, &buf); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
diff --git a/sdks/go/pkg/beam/doc_test.go b/sdks/go/pkg/beam/doc_test.go
index 92a2b03..6c27980 100644
--- a/sdks/go/pkg/beam/doc_test.go
+++ b/sdks/go/pkg/beam/doc_test.go
@@ -74,7 +74,7 @@ func Example_gettingStarted() {
        // pipeline can be executed by a PipelineRunner.  The direct runner 
executes the
        // transforms directly, sequentially, in this one process, which is 
useful for
        // unit tests and simple experiments:
-       if err := direct.Execute(context.Background(), p); err != nil {
+       if _, err := direct.Execute(context.Background(), p); err != nil {
                fmt.Printf("Pipeline failed: %v", err)
        }
 }
diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go 
b/sdks/go/pkg/beam/io/textio/sdf_test.go
index 05a26fa..733d79a 100644
--- a/sdks/go/pkg/beam/io/textio/sdf_test.go
+++ b/sdks/go/pkg/beam/io/textio/sdf_test.go
@@ -33,7 +33,7 @@ func TestReadSdf(t *testing.T) {
        lines := ReadSdf(s, f)
        passert.Count(s, lines, "NumLines", 1)
 
-       if err := beam.Run(context.Background(), "direct", p); err != nil {
+       if _, err := beam.Run(context.Background(), "direct", p); err != nil {
                t.Fatalf("Failed to execute job: %v", err)
        }
 }
diff --git a/sdks/go/pkg/beam/pipeline.go b/sdks/go/pkg/beam/pipeline.go
index 0c70463..26087d4 100644
--- a/sdks/go/pkg/beam/pipeline.go
+++ b/sdks/go/pkg/beam/pipeline.go
@@ -17,6 +17,7 @@ package beam
 
 import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 )
 
 // Scope is a hierarchical grouping for composite transforms. Scopes can be
@@ -85,3 +86,8 @@ func (p *Pipeline) Build() ([]*graph.MultiEdge, 
[]*graph.Node, error) {
 func (p *Pipeline) String() string {
        return p.real.String()
 }
+
+// PipelineResult is the result of beamx.RunWithMetrics.
+type PipelineResult interface {
+       Metrics() metrics.Results
+}
diff --git a/sdks/go/pkg/beam/runner.go b/sdks/go/pkg/beam/runner.go
index 28563bd..1b6b41f 100644
--- a/sdks/go/pkg/beam/runner.go
+++ b/sdks/go/pkg/beam/runner.go
@@ -27,12 +27,12 @@ import (
 // verification, but require that it is stored in Init and used for Run.
 
 var (
-       runners = make(map[string]func(ctx context.Context, p *Pipeline) error)
+       runners = make(map[string]func(ctx context.Context, p *Pipeline) 
(PipelineResult, error))
 )
 
 // RegisterRunner associates the name with the supplied runner, making it 
available
 // to execute a pipeline via Run.
-func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline) 
error) {
+func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline) 
(PipelineResult, error)) {
        if _, ok := runners[name]; ok {
                panic(fmt.Sprintf("runner %v already defined", name))
        }
@@ -42,7 +42,7 @@ func RegisterRunner(name string, fn func(ctx context.Context, 
p *Pipeline) error
 // Run executes the pipeline using the selected registred runner. It is 
customary
 // to define a "runner" with no default as a flag to let users control runner
 // selection.
-func Run(ctx context.Context, runner string, p *Pipeline) error {
+func Run(ctx context.Context, runner string, p *Pipeline) (PipelineResult, 
error) {
        fn, ok := runners[runner]
        if !ok {
                log.Exitf(ctx, "Runner %v not registered. Forgot to _ import 
it?", runner)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 09b62c6..96ecde0 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -84,24 +84,24 @@ var unique int32
 
 // Execute runs the given pipeline on Google Cloud Dataflow. It uses the
 // default application credentials to submit the job.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
        // (1) Gather job options
 
        project := *gcpopts.Project
        if project == "" {
-               return errors.New("no Google Cloud project specified. Use 
--project=<project>")
+               return nil, errors.New("no Google Cloud project specified. Use 
--project=<project>")
        }
        region := gcpopts.GetRegion(ctx)
        if region == "" {
-               return errors.New("No Google Cloud region specified. Use 
--region=<region>. See 
https://cloud.google.com/dataflow/docs/concepts/regional-endpoints";)
+               return nil, errors.New("No Google Cloud region specified. Use 
--region=<region>. See 
https://cloud.google.com/dataflow/docs/concepts/regional-endpoints";)
        }
        if *stagingLocation == "" {
-               return errors.New("no GCS staging location specified. Use 
--staging_location=gs://<bucket>/<path>")
+               return nil, errors.New("no GCS staging location specified. Use 
--staging_location=gs://<bucket>/<path>")
        }
        var jobLabels map[string]string
        if *labels != "" {
                if err := json.Unmarshal([]byte(*labels), &jobLabels); err != 
nil {
-                       return errors.Wrapf(err, "error reading --label flag as 
JSON")
+                       return nil, errors.Wrapf(err, "error reading --label 
flag as JSON")
                }
        }
 
@@ -120,7 +120,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        }
        if *autoscalingAlgorithm != "" {
                if *autoscalingAlgorithm != "NONE" && *autoscalingAlgorithm != 
"THROUGHPUT_BASED" {
-                       return errors.New("invalid autoscaling algorithm. Use 
--autoscaling_algorithm=(NONE|THROUGHPUT_BASED)")
+                       return nil, errors.New("invalid autoscaling algorithm. 
Use --autoscaling_algorithm=(NONE|THROUGHPUT_BASED)")
                }
        }
 
@@ -173,15 +173,15 @@ func Execute(ctx context.Context, p *beam.Pipeline) error 
{
 
        edges, _, err := p.Build()
        if err != nil {
-               return err
+               return nil, err
        }
        enviroment, err := graphx.CreateEnvironment(ctx, 
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
        if err != nil {
-               return errors.WithContext(err, "generating model pipeline")
+               return nil, errors.WithContext(err, "generating model pipeline")
        }
        model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
enviroment})
        if err != nil {
-               return errors.WithContext(err, "generating model pipeline")
+               return nil, errors.WithContext(err, "generating model pipeline")
        }
 
        // NOTE(herohde) 10/8/2018: the last segment of the names must be 
"worker" and "dataflow-worker.jar".
@@ -197,14 +197,14 @@ func Execute(ctx context.Context, p *beam.Pipeline) error 
{
                log.Info(ctx, proto.MarshalTextString(model))
                job, err := dataflowlib.Translate(ctx, model, opts, workerURL, 
jarURL, modelURL)
                if err != nil {
-                       return err
+                       return nil, err
                }
                dataflowlib.PrintJob(ctx, job)
-               return nil
+               return nil, nil
        }
 
        _, err = dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, 
modelURL, *endpoint, *executeAsync)
-       return err
+       return nil, err
 }
 func gcsRecorderHook(opts []string) perf.CaptureHook {
        bucket, prefix, err := gcsx.ParseObject(opts[0])
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go 
b/sdks/go/pkg/beam/runners/direct/direct.go
index a2c3807..5f07135 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -37,7 +37,7 @@ func init() {
 }
 
 // Execute runs the pipeline in-process.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
        log.Info(ctx, "Executing pipeline with the direct runner.")
 
        if !beam.Initialized() {
@@ -49,33 +49,33 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
        if *jobopts.Strict {
                log.Info(ctx, "Strict mode enabled, applying additional 
validation.")
-               if err := vet.Execute(ctx, p); err != nil {
-                       return errors.Wrap(err, "strictness check failed")
+               if _, err := vet.Execute(ctx, p); err != nil {
+                       return nil, errors.Wrap(err, "strictness check failed")
                }
                log.Info(ctx, "Strict mode validation passed.")
        }
 
        edges, _, err := p.Build()
        if err != nil {
-               return errors.Wrap(err, "invalid pipeline")
+               return nil, errors.Wrap(err, "invalid pipeline")
        }
        plan, err := Compile(edges)
        if err != nil {
-               return errors.Wrap(err, "translation failed")
+               return nil, errors.Wrap(err, "translation failed")
        }
        log.Info(ctx, plan)
 
        if err = plan.Execute(ctx, "", exec.DataContext{}); err != nil {
                plan.Down(ctx) // ignore any teardown errors
-               return err
+               return nil, err
        }
        if err = plan.Down(ctx); err != nil {
-               return err
+               return nil, err
        }
        // TODO(lostluck) 2020/01/24: What's the right way to expose the
        // metrics store for the direct runner?
        metrics.DumpToLogFromStore(ctx, plan.Store())
-       return nil
+       return nil, nil
 }
 
 // Compile translates a pipeline to a multi-bundle execution plan.
diff --git a/sdks/go/pkg/beam/runners/dot/dot.go 
b/sdks/go/pkg/beam/runners/dot/dot.go
index 547aa10..9a95ece 100644
--- a/sdks/go/pkg/beam/runners/dot/dot.go
+++ b/sdks/go/pkg/beam/runners/dot/dot.go
@@ -37,19 +37,19 @@ func init() {
 var dotFile = flag.String("dot_file", "", "DOT output file to create")
 
 // Execute produces a DOT representation of the pipeline.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
        if *dotFile == "" {
-               return errors.New("must supply dot_file argument")
+               return nil, errors.New("must supply dot_file argument")
        }
 
        edges, nodes, err := p.Build()
        if err != nil {
-               return errors.New("can't get data to render")
+               return nil, errors.New("can't get data to render")
        }
 
        var buf bytes.Buffer
        if err := dotlib.Render(edges, nodes, &buf); err != nil {
-               return err
+               return nil, err
        }
-       return ioutil.WriteFile(*dotFile, buf.Bytes(), 0644)
+       return nil, ioutil.WriteFile(*dotFile, buf.Bytes(), 0644)
 }
diff --git a/sdks/go/pkg/beam/runners/flink/flink.go 
b/sdks/go/pkg/beam/runners/flink/flink.go
index 0384b98..8dd0157 100644
--- a/sdks/go/pkg/beam/runners/flink/flink.go
+++ b/sdks/go/pkg/beam/runners/flink/flink.go
@@ -29,6 +29,6 @@ func init() {
 
 // Execute runs the given pipeline on Flink. Convenience wrapper over the
 // universal runner.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
        return universal.Execute(ctx, p)
 }
diff --git a/sdks/go/pkg/beam/runners/spark/spark.go 
b/sdks/go/pkg/beam/runners/spark/spark.go
index c216c59..5d67200 100644
--- a/sdks/go/pkg/beam/runners/spark/spark.go
+++ b/sdks/go/pkg/beam/runners/spark/spark.go
@@ -29,6 +29,6 @@ func init() {
 
 // Execute runs the given pipeline on Spark. Convenience wrapper over the
 // universal runner.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
        return universal.Execute(ctx, p)
 }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index dbb3387..38d7a75 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -20,6 +20,8 @@ import (
        "os"
        "time"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
@@ -29,19 +31,20 @@ import (
 
 // Execute executes a pipeline on the universal runner serving the given 
endpoint.
 // Convenience function.
-func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt 
*JobOptions, async bool) (string, error) {
+func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt 
*JobOptions, async bool) (*universalPipelineResult, error) {
        // (1) Prepare job to obtain artifact staging instructions.
+       presult := &universalPipelineResult{JobID: ""}
 
        cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
        if err != nil {
-               return "", errors.WithContextf(err, "connecting to job service")
+               return presult, errors.WithContextf(err, "connecting to job 
service")
        }
        defer cc.Close()
        client := jobpb.NewJobServiceClient(cc)
 
        prepID, artifactEndpoint, st, err := Prepare(ctx, client, p, opt)
        if err != nil {
-               return "", err
+               return presult, err
        }
 
        log.Infof(ctx, "Prepared job with id: %v and staging token: %v", 
prepID, st)
@@ -58,7 +61,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
 
                        worker, err := BuildTempWorkerBinary(ctx)
                        if err != nil {
-                               return "", err
+                               return presult, err
                        }
                        defer os.Remove(worker)
 
@@ -70,7 +73,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
 
        token, err := Stage(ctx, prepID, artifactEndpoint, bin, st)
        if err != nil {
-               return "", err
+               return presult, err
        }
 
        log.Infof(ctx, "Staged binary artifact with token: %v", token)
@@ -79,7 +82,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
 
        jobID, err := Submit(ctx, client, prepID, token)
        if err != nil {
-               return "", err
+               return presult, err
        }
 
        log.Infof(ctx, "Submitted job: %v", jobID)
@@ -87,7 +90,36 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
        // (4) Wait for completion.
 
        if async {
-               return jobID, nil
+               return presult, nil
        }
-       return jobID, WaitForCompletion(ctx, client, jobID)
+       err = WaitForCompletion(ctx, client, jobID)
+
+       res, err := newUniversalPipelineResult(ctx, jobID, client)
+       if err != nil {
+               return presult, err
+       }
+       presult = res
+
+       return presult, err
+}
+
+type universalPipelineResult struct {
+       JobID   string
+       metrics *metrics.Results
+}
+
+func newUniversalPipelineResult(ctx context.Context, jobID string, client 
jobpb.JobServiceClient) (*universalPipelineResult, error) {
+       request := &jobpb.GetJobMetricsRequest{JobId: jobID}
+       response, err := client.GetJobMetrics(ctx, request)
+       if err != nil {
+               return &universalPipelineResult{jobID, nil}, errors.Wrap(err, 
"failed to get metrics")
+       }
+
+       monitoredStates := response.GetMetrics()
+       metrics := metricsx.FromMonitoringInfos(monitoredStates.Attempted, 
monitoredStates.Committed)
+       return &universalPipelineResult{jobID, metrics}, nil
+}
+
+func (pr universalPipelineResult) Metrics() metrics.Results {
+       return *pr.metrics
 }
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index de5378c..3ad1a18 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -42,27 +42,27 @@ func init() {
 }
 
 // Execute executes the pipeline on a universal beam runner.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
        if !beam.Initialized() {
                panic(fmt.Sprint("Beam has not been initialized. Call 
beam.Init() before pipeline construction."))
        }
 
        if *jobopts.Strict {
                log.Info(ctx, "Strict mode enabled, applying additional 
validation.")
-               if err := vet.Execute(ctx, p); err != nil {
-                       return errors.Wrap(err, "strictness check failed")
+               if _, err := vet.Execute(ctx, p); err != nil {
+                       return nil, errors.Wrap(err, "strictness check failed")
                }
                log.Info(ctx, "Strict mode validation passed.")
        }
 
        endpoint, err := jobopts.GetEndpoint()
        if err != nil {
-               return err
+               return nil, err
        }
 
        edges, _, err := p.Build()
        if err != nil {
-               return err
+               return nil, err
        }
        envUrn := jobopts.GetEnvironmentUrn(ctx)
        getEnvCfg := jobopts.GetEnvironmentConfig
@@ -71,7 +71,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                // TODO(BEAM-10610): Allow user configuration of this port, 
rather than kernel selected.
                srv, err := extworker.StartLoopback(ctx, 0)
                if err != nil {
-                       return err
+                       return nil, err
                }
                defer srv.Stop(ctx)
                getEnvCfg = srv.EnvironmentConfig
@@ -79,11 +79,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
        enviroment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
        if err != nil {
-               return errors.WithContextf(err, "generating model pipeline")
+               return nil, errors.WithContextf(err, "generating model 
pipeline")
        }
        pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: 
enviroment})
        if err != nil {
-               return errors.WithContextf(err, "generating model pipeline")
+               return nil, errors.WithContextf(err, "generating model 
pipeline")
        }
 
        // Fetch all dependencies for cross-language transforms
@@ -97,6 +97,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                Worker:       *jobopts.WorkerBinary,
                RetainDocker: *jobopts.RetainDockerContainers,
        }
-       _, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
-       return err
+       presult, err := runnerlib.Execute(ctx, pipeline, endpoint, opt, 
*jobopts.Async)
+       return presult, err
 }
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go 
b/sdks/go/pkg/beam/runners/vet/vet.go
index 4ce0e2f..268852a 100644
--- a/sdks/go/pkg/beam/runners/vet/vet.go
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -58,10 +58,10 @@ func (p disabledResolver) Sym2Addr(name string) (uintptr, 
error) {
 }
 
 // Execute evaluates the pipeline on whether it can run without reflection.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
        e, err := Evaluate(ctx, p)
        if err != nil {
-               return errors.WithContext(err, "validating pipeline with vet 
runner")
+               return nil, errors.WithContext(err, "validating pipeline with 
vet runner")
        }
        if !e.Performant() {
                e.summary()
@@ -69,10 +69,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                e.diag("*/\n")
                err := errors.Errorf("pipeline is not performant, see 
diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
                err = errors.WithContext(err, "validating pipeline with vet 
runner")
-               return errors.SetTopLevelMsg(err, "pipeline is not performant")
+               return nil, errors.SetTopLevelMsg(err, "pipeline is not 
performant")
        }
        // Pipeline nas no further tasks.
-       return nil
+       return nil, nil
 }
 
 // Evaluate returns an object that can generate necessary shims and inits.
diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go 
b/sdks/go/pkg/beam/testing/ptest/ptest.go
index fe23f6c..602303a 100644
--- a/sdks/go/pkg/beam/testing/ptest/ptest.go
+++ b/sdks/go/pkg/beam/testing/ptest/ptest.go
@@ -73,7 +73,8 @@ func Run(p *beam.Pipeline) error {
        if *Runner == "" {
                *Runner = defaultRunner
        }
-       return beam.Run(context.Background(), *Runner, p)
+       _, err := beam.Run(context.Background(), *Runner, p)
+       return err
 }
 
 // Main is an implementation of testing's TestMain to permit testing
diff --git a/sdks/go/pkg/beam/x/beamx/run.go b/sdks/go/pkg/beam/x/beamx/run.go
index 9a1d27a..77d766b 100644
--- a/sdks/go/pkg/beam/x/beamx/run.go
+++ b/sdks/go/pkg/beam/x/beamx/run.go
@@ -40,5 +40,13 @@ var runner = flag.String("runner", "direct", "Pipeline 
runner.")
 // defaults to the direct runner, but all beam-distributed runners and textio
 // filesystems are implicitly registered.
 func Run(ctx context.Context, p *beam.Pipeline) error {
+       _, err := beam.Run(ctx, *runner, p)
+       return err
+}
+
+// RunWithMetrics invokes beam.Run with the runner supplied by the
+// flag "runner". Returns a beam.PipelineResult objects, which can be
+// accessed to query the pipeline's metrics.
+func RunWithMetrics(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error) {
        return beam.Run(ctx, *runner, p)
 }

Reply via email to