This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new baae15a [BEAM-11207] Metric Extraction via proto RPC API (#13272) baae15a is described below commit baae15a6e85033b7d32a38daf12bb03a7cc7fe74 Author: Kamil Wasilewski <kamil.wasilew...@polidea.com> AuthorDate: Mon Nov 16 23:27:45 2020 +0100 [BEAM-11207] Metric Extraction via proto RPC API (#13272) For the Go SDK. --- sdks/go/pkg/beam/core/metrics/metrics.go | 118 +++++++++++++ .../go/pkg/beam/core/runtime/harness/monitoring.go | 187 +++------------------ .../beam/core/runtime/harness/monitoring_test.go | 33 ++-- sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go | 172 +++++++++++++++++++ .../beam/core/runtime/metricsx/metricsx_test.go | 166 ++++++++++++++++++ sdks/go/pkg/beam/core/runtime/metricsx/urns.go | 170 +++++++++++++++++++ sdks/go/pkg/beam/doc_test.go | 2 +- sdks/go/pkg/beam/io/textio/sdf_test.go | 2 +- sdks/go/pkg/beam/pipeline.go | 6 + sdks/go/pkg/beam/runner.go | 6 +- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 24 +-- sdks/go/pkg/beam/runners/direct/direct.go | 16 +- sdks/go/pkg/beam/runners/dot/dot.go | 10 +- sdks/go/pkg/beam/runners/flink/flink.go | 2 +- sdks/go/pkg/beam/runners/spark/spark.go | 2 +- .../beam/runners/universal/runnerlib/execute.go | 48 +++++- sdks/go/pkg/beam/runners/universal/universal.go | 20 +-- sdks/go/pkg/beam/runners/vet/vet.go | 8 +- sdks/go/pkg/beam/testing/ptest/ptest.go | 3 +- sdks/go/pkg/beam/x/beamx/run.go | 8 + 20 files changed, 770 insertions(+), 233 deletions(-) diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go b/sdks/go/pkg/beam/core/metrics/metrics.go index 5b14c4a..33b8ef2 100644 --- a/sdks/go/pkg/beam/core/metrics/metrics.go +++ b/sdks/go/pkg/beam/core/metrics/metrics.go @@ -381,6 +381,11 @@ func (m *distribution) get() (count, sum, min, max int64) { return m.count, m.sum, m.min, m.max } +// DistributionValue is the value of a Distribution metric. +type DistributionValue struct { + Count, Sum, Min, Max int64 +} + // Gauge is a time, value pair metric. type Gauge struct { name name @@ -448,3 +453,116 @@ func (m *gauge) get() (int64, time.Time) { defer m.mu.Unlock() return m.v, m.t } + +// GaugeValue is the value of a Gauge metric. +type GaugeValue struct { + Value int64 + Timestamp time.Time +} + +// Results represents all metrics gathered during the job's execution. +// It allows for querying metrics using a provided filter. +type Results struct { + counters []CounterResult + distributions []DistributionResult + gauges []GaugeResult +} + +// NewResults creates a new Results. +func NewResults( + counters []CounterResult, + distributions []DistributionResult, + gauges []GaugeResult) *Results { + return &Results{counters, distributions, gauges} +} + +// AllMetrics returns all metrics from a Results instance. +func (mr Results) AllMetrics() QueryResults { + return QueryResults{mr.counters, mr.distributions, mr.gauges} +} + +// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering + +// QueryResults is the result of a query. Allows accessing all of the +// metrics that matched the filter. +type QueryResults struct { + counters []CounterResult + distributions []DistributionResult + gauges []GaugeResult +} + +// Counters returns a slice of counter metrics. +func (qr QueryResults) Counters() []CounterResult { + out := make([]CounterResult, len(qr.counters)) + copy(out, qr.counters) + return out +} + +// Distributions returns a slice of distribution metrics. +func (qr QueryResults) Distributions() []DistributionResult { + out := make([]DistributionResult, len(qr.distributions)) + copy(out, qr.distributions) + return out +} + +// Gauges returns a slice of gauge metrics. +func (qr QueryResults) Gauges() []GaugeResult { + out := make([]GaugeResult, len(qr.gauges)) + copy(out, qr.gauges) + return out +} + +// CounterResult is an attempted and a commited value of a counter metric plus +// key. +type CounterResult struct { + Attempted, Committed int64 + Key StepKey +} + +// Result returns committed metrics. Falls back to attempted metrics if committed +// are not populated (e.g. due to not being supported on a given runner). +func (r CounterResult) Result() int64 { + if r.Committed != 0 { + return r.Committed + } + return r.Attempted +} + +// DistributionResult is an attempted and a commited value of a distribution +// metric plus key. +type DistributionResult struct { + Attempted, Committed DistributionValue + Key StepKey +} + +// Result returns committed metrics. Falls back to attempted metrics if committed +// are not populated (e.g. due to not being supported on a given runner). +func (r DistributionResult) Result() DistributionValue { + empty := DistributionValue{} + if r.Committed != empty { + return r.Committed + } + return r.Attempted +} + +// GaugeResult is an attempted and a commited value of a gauge metric plus +// key. +type GaugeResult struct { + Attempted, Committed GaugeValue + Key StepKey +} + +// Result returns committed metrics. Falls back to attempted metrics if committed +// are not populated (e.g. due to not being supported on a given runner). +func (r GaugeResult) Result() GaugeValue { + empty := GaugeValue{} + if r.Committed != empty { + return r.Committed + } + return r.Attempted +} + +// StepKey uniquely identifies a metric within a pipeline graph. +type StepKey struct { + Step, Name, Namespace string +} diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go index 53419ce..c9ddb80 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go +++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go @@ -16,121 +16,20 @@ package harness import ( - "bytes" "strconv" "sync" "sync/atomic" "time" - "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" - "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" ) -type mUrn uint32 - -// TODO: Pull these from the protos. -var sUrns = [...]string{ - "beam:metric:user:sum_int64:v1", - "beam:metric:user:sum_double:v1", - "beam:metric:user:distribution_int64:v1", - "beam:metric:user:distribution_double:v1", - "beam:metric:user:latest_int64:v1", - "beam:metric:user:latest_double:v1", - "beam:metric:user:top_n_int64:v1", - "beam:metric:user:top_n_double:v1", - "beam:metric:user:bottom_n_int64:v1", - "beam:metric:user:bottom_n_double:v1", - - "beam:metric:element_count:v1", - "beam:metric:sampled_byte_size:v1", - - "beam:metric:pardo_execution_time:start_bundle_msecs:v1", - "beam:metric:pardo_execution_time:process_bundle_msecs:v1", - "beam:metric:pardo_execution_time:finish_bundle_msecs:v1", - "beam:metric:ptransform_execution_time:total_msecs:v1", - - "beam:metric:ptransform_progress:remaining:v1", - "beam:metric:ptransform_progress:completed:v1", - "beam:metric:data_channel:read_index:v1", - - "TestingSentinelUrn", // Must remain last. -} - -const ( - urnUserSumInt64 mUrn = iota - urnUserSumFloat64 - urnUserDistInt64 - urnUserDistFloat64 - urnUserLatestMsInt64 - urnUserLatestMsFloat64 - urnUserTopNInt64 - urnUserTopNFloat64 - urnUserBottomNInt64 - urnUserBottomNFloat64 - - urnElementCount - urnSampledByteSize - - urnStartBundle - urnProcessBundle - urnFinishBundle - urnTransformTotalTime - - urnProgressRemaining - urnProgressCompleted - urnDataChannelReadIndex - - urnTestSentinel // Must remain last. -) - -// urnToType maps the urn to it's encoding type. -// This function is written to be inlinable by the compiler. -func urnToType(u mUrn) string { - switch u { - case urnUserSumInt64, urnElementCount, urnStartBundle, urnProcessBundle, urnFinishBundle, urnTransformTotalTime: - return "beam:metrics:sum_int64:v1" - case urnUserSumFloat64: - return "beam:metrics:sum_double:v1" - case urnUserDistInt64, urnSampledByteSize: - return "beam:metrics:distribution_int64:v1" - case urnUserDistFloat64: - return "beam:metrics:distribution_double:v1" - case urnUserLatestMsInt64: - return "beam:metrics:latest_int64:v1" - case urnUserLatestMsFloat64: - return "beam:metrics:latest_double:v1" - case urnUserTopNInt64: - return "beam:metrics:top_n_int64:v1" - case urnUserTopNFloat64: - return "beam:metrics:top_n_double:v1" - case urnUserBottomNInt64: - return "beam:metrics:bottom_n_int64:v1" - case urnUserBottomNFloat64: - return "beam:metrics:bottom_n_double:v1" - - case urnProgressRemaining, urnProgressCompleted: - return "beam:metrics:progress:v1" - case urnDataChannelReadIndex: - return "beam:metrics:sum_int64:v1" - - // Monitoring Table isn't currently in the protos. - // case ???: - // return "beam:metrics:monitoring_table:v1" - - case urnTestSentinel: - return "TestingSentinelType" - - default: - panic("metric urn without specified type" + sUrns[u]) - } -} - type shortKey struct { metrics.Labels - Urn mUrn // Urns fully specify their type. + Urn metricsx.Urn // Urns fully specify their type. } // shortIDCache retains lookup caches for short ids to the full monitoring @@ -162,7 +61,7 @@ func (c *shortIDCache) getNextShortID() string { // getShortID returns the short id for the given metric, and if // it doesn't exist yet, stores the metadata. // Assumes c.mu lock is held. -func (c *shortIDCache) getShortID(l metrics.Labels, urn mUrn) string { +func (c *shortIDCache) getShortID(l metrics.Labels, urn metricsx.Urn) string { k := shortKey{l, urn} s, ok := c.labels2ShortIds[k] if ok { @@ -171,8 +70,8 @@ func (c *shortIDCache) getShortID(l metrics.Labels, urn mUrn) string { s = c.getNextShortID() c.labels2ShortIds[k] = s c.shortIds2Infos[s] = &pipepb.MonitoringInfo{ - Urn: sUrns[urn], - Type: urnToType(urn), + Urn: metricsx.UrnToString(urn), + Type: metricsx.UrnToType(urn), Labels: userLabels(l), } return s @@ -195,7 +94,7 @@ func init() { defaultShortIDCache = newShortIDCache() } -func getShortID(l metrics.Labels, urn mUrn) string { +func getShortID(l metrics.Labels, urn metricsx.Urn) string { return defaultShortIDCache.getShortID(l, urn) } @@ -216,46 +115,46 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) { payloads := make(map[string][]byte) metrics.Extractor{ SumInt64: func(l metrics.Labels, v int64) { - payload, err := int64Counter(v) + payload, err := metricsx.Int64Counter(v) if err != nil { panic(err) } - payloads[getShortID(l, urnUserSumInt64)] = payload + payloads[getShortID(l, metricsx.UrnUserSumInt64)] = payload monitoringInfo = append(monitoringInfo, &pipepb.MonitoringInfo{ - Urn: sUrns[urnUserSumInt64], - Type: urnToType(urnUserSumInt64), + Urn: metricsx.UrnToString(metricsx.UrnUserSumInt64), + Type: metricsx.UrnToType(metricsx.UrnUserSumInt64), Labels: userLabels(l), Payload: payload, }) }, DistributionInt64: func(l metrics.Labels, count, sum, min, max int64) { - payload, err := int64Distribution(count, sum, min, max) + payload, err := metricsx.Int64Distribution(count, sum, min, max) if err != nil { panic(err) } - payloads[getShortID(l, urnUserDistInt64)] = payload + payloads[getShortID(l, metricsx.UrnUserDistInt64)] = payload monitoringInfo = append(monitoringInfo, &pipepb.MonitoringInfo{ - Urn: sUrns[urnUserDistInt64], - Type: urnToType(urnUserDistInt64), + Urn: metricsx.UrnToString(metricsx.UrnUserDistInt64), + Type: metricsx.UrnToType(metricsx.UrnUserDistInt64), Labels: userLabels(l), Payload: payload, }) }, GaugeInt64: func(l metrics.Labels, v int64, t time.Time) { - payload, err := int64Latest(t, v) + payload, err := metricsx.Int64Latest(t, v) if err != nil { panic(err) } - payloads[getShortID(l, urnUserLatestMsInt64)] = payload + payloads[getShortID(l, metricsx.UrnUserLatestMsInt64)] = payload monitoringInfo = append(monitoringInfo, &pipepb.MonitoringInfo{ - Urn: sUrns[urnUserLatestMsInt64], - Type: urnToType(urnUserLatestMsInt64), + Urn: metricsx.UrnToString(metricsx.UrnUserLatestMsInt64), + Type: metricsx.UrnToType(metricsx.UrnUserLatestMsInt64), Labels: userLabels(l), Payload: payload, }) @@ -265,28 +164,28 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) { // Get the execution monitoring information from the bundle plan. if snapshot, ok := p.Progress(); ok { - payload, err := int64Counter(snapshot.Count) + payload, err := metricsx.Int64Counter(snapshot.Count) if err != nil { panic(err) } // TODO(BEAM-9934): This metric should account for elements in multiple windows. - payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), urnElementCount)] = payload + payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), metricsx.UrnElementCount)] = payload monitoringInfo = append(monitoringInfo, &pipepb.MonitoringInfo{ - Urn: sUrns[urnElementCount], - Type: urnToType(urnElementCount), + Urn: metricsx.UrnToString(metricsx.UrnElementCount), + Type: metricsx.UrnToType(metricsx.UrnElementCount), Labels: map[string]string{ "PCOLLECTION": snapshot.PID, }, Payload: payload, }) - payloads[getShortID(metrics.PTransformLabels(snapshot.ID), urnDataChannelReadIndex)] = payload + payloads[getShortID(metrics.PTransformLabels(snapshot.ID), metricsx.UrnDataChannelReadIndex)] = payload monitoringInfo = append(monitoringInfo, &pipepb.MonitoringInfo{ - Urn: sUrns[urnDataChannelReadIndex], - Type: urnToType(urnDataChannelReadIndex), + Urn: metricsx.UrnToString(metricsx.UrnDataChannelReadIndex), + Type: metricsx.UrnToType(metricsx.UrnDataChannelReadIndex), Labels: map[string]string{ "PTRANSFORM": snapshot.ID, }, @@ -305,39 +204,3 @@ func userLabels(l metrics.Labels) map[string]string { "NAME": l.Name(), } } - -func int64Counter(v int64) ([]byte, error) { - var buf bytes.Buffer - if err := coder.EncodeVarInt(v, &buf); err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func int64Latest(t time.Time, v int64) ([]byte, error) { - var buf bytes.Buffer - if err := coder.EncodeVarInt(mtime.FromTime(t).Milliseconds(), &buf); err != nil { - return nil, err - } - if err := coder.EncodeVarInt(v, &buf); err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func int64Distribution(count, sum, min, max int64) ([]byte, error) { - var buf bytes.Buffer - if err := coder.EncodeVarInt(count, &buf); err != nil { - return nil, err - } - if err := coder.EncodeVarInt(sum, &buf); err != nil { - return nil, err - } - if err := coder.EncodeVarInt(min, &buf); err != nil { - return nil, err - } - if err := coder.EncodeVarInt(max, &buf); err != nil { - return nil, err - } - return buf.Bytes(), nil -} diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go b/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go index 456fce4..ed792dd 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go @@ -20,34 +20,35 @@ import ( "testing" "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx" ) func TestGetShortID(t *testing.T) { tests := []struct { id string - urn mUrn + urn metricsx.Urn labels metrics.Labels expectedUrn string expectedType string }{ { id: "1", - urn: urnUserDistInt64, + urn: metricsx.UrnUserDistInt64, expectedUrn: "beam:metric:user:distribution_int64:v1", expectedType: "beam:metrics:distribution_int64:v1", }, { id: "2", - urn: urnElementCount, + urn: metricsx.UrnElementCount, expectedUrn: "beam:metric:element_count:v1", expectedType: "beam:metrics:sum_int64:v1", }, { id: "3", - urn: urnProgressCompleted, + urn: metricsx.UrnProgressCompleted, expectedUrn: "beam:metric:ptransform_progress:completed:v1", expectedType: "beam:metrics:progress:v1", }, { id: "4", - urn: urnUserDistFloat64, + urn: metricsx.UrnUserDistFloat64, expectedUrn: "beam:metric:user:distribution_double:v1", expectedType: "beam:metrics:distribution_double:v1", }, { @@ -56,25 +57,25 @@ func TestGetShortID(t *testing.T) { // in the list, or vice versa, this should fail with either // an index out of range panic or a mismatch. id: "5", - urn: urnTestSentinel, + urn: metricsx.UrnTestSentinel, expectedUrn: "TestingSentinelUrn", expectedType: "TestingSentinelType", }, { id: "6", - urn: urnFinishBundle, + urn: metricsx.UrnFinishBundle, expectedUrn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1", expectedType: "beam:metrics:sum_int64:v1", }, { // This case and the next one validates that different labels // with the same urn are in fact assigned different short ids. id: "7", - urn: urnUserSumInt64, + urn: metricsx.UrnUserSumInt64, labels: metrics.UserLabels("myT", "harness", "metricNumber7"), expectedUrn: "beam:metric:user:sum_int64:v1", expectedType: "beam:metrics:sum_int64:v1", }, { id: "8", - urn: urnUserSumInt64, + urn: metricsx.UrnUserSumInt64, labels: metrics.UserLabels("myT", "harness", "metricNumber8"), expectedUrn: "beam:metric:user:sum_int64:v1", expectedType: "beam:metrics:sum_int64:v1", @@ -84,13 +85,13 @@ func TestGetShortID(t *testing.T) { // user metrics are unique per label set, but this isn't the layer // to validate that condition. id: "9", - urn: urnUserTopNFloat64, + urn: metricsx.UrnUserTopNFloat64, labels: metrics.UserLabels("myT", "harness", "metricNumber7"), expectedUrn: "beam:metric:user:top_n_double:v1", expectedType: "beam:metrics:top_n_double:v1", }, { id: "a", - urn: urnElementCount, + urn: metricsx.UrnElementCount, labels: metrics.PCollectionLabels("myPCol"), expectedUrn: "beam:metric:element_count:v1", expectedType: "beam:metrics:sum_int64:v1", @@ -133,7 +134,7 @@ func TestGetShortID(t *testing.T) { // is initialized properly. func TestShortIdCache_Default(t *testing.T) { defaultShortIDCache.mu.Lock() - s := getShortID(metrics.UserLabels("this", "doesn't", "matter"), urnTestSentinel) + s := getShortID(metrics.UserLabels("this", "doesn't", "matter"), metricsx.UrnTestSentinel) defaultShortIDCache.mu.Unlock() info := shortIdsToInfos([]string{s})[s] @@ -148,11 +149,11 @@ func TestShortIdCache_Default(t *testing.T) { func BenchmarkGetShortID(b *testing.B) { b.Run("new", func(b *testing.B) { l := metrics.UserLabels("this", "doesn't", strconv.FormatInt(-1, 36)) - last := getShortID(l, urnTestSentinel) + last := getShortID(l, metricsx.UrnTestSentinel) for i := int64(0); i < int64(b.N); i++ { // Ensure it's allocated to the stack. l = metrics.UserLabels("this", "doesn't", strconv.FormatInt(i, 36)) - got := getShortID(l, urnTestSentinel) + got := getShortID(l, metricsx.UrnTestSentinel) if got == last { b.Fatalf("short collision: at %s", got) } @@ -162,9 +163,9 @@ func BenchmarkGetShortID(b *testing.B) { b.Run("amortized", func(b *testing.B) { l := metrics.UserLabels("this", "doesn't", "matter") c := newShortIDCache() - want := c.getShortID(l, urnTestSentinel) + want := c.getShortID(l, metricsx.UrnTestSentinel) for i := 0; i < b.N; i++ { - got := c.getShortID(l, urnTestSentinel) + got := c.getShortID(l, metricsx.UrnTestSentinel) if got != want { b.Fatalf("different short ids: got %s, want %s", got, want) } diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go new file mode 100644 index 0000000..6cd10b4 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsx + +import ( + "bytes" + "fmt" + "log" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" +) + +// FromMonitoringInfos extracts metrics from monitored states and +// groups them into counters, distributions and gauges. +func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed []*pipepb.MonitoringInfo) *metrics.Results { + ac, ad, ag := groupByType(attempted) + cc, cd, cg := groupByType(committed) + + return metrics.NewResults(mergeCounters(ac, cc), mergeDistributions(ad, cd), mergeGauges(ag, cg)) +} + +func groupByType(minfos []*pipepb.MonitoringInfo) ( + map[metrics.StepKey]int64, + map[metrics.StepKey]metrics.DistributionValue, + map[metrics.StepKey]metrics.GaugeValue) { + counters := make(map[metrics.StepKey]int64) + distributions := make(map[metrics.StepKey]metrics.DistributionValue) + gauges := make(map[metrics.StepKey]metrics.GaugeValue) + + for _, minfo := range minfos { + key, err := extractKey(minfo) + if err != nil { + log.Println(err) + continue + } + + r := bytes.NewReader(minfo.GetPayload()) + + switch minfo.GetType() { + case "beam:metrics:sum_int64:v1": + value, err := extractCounterValue(r) + if err != nil { + log.Println(err) + continue + } + counters[key] = value + case "beam:metrics:distribution_int64:v1": + value, err := extractDistributionValue(r) + if err != nil { + log.Println(err) + continue + } + distributions[key] = value + case + "beam:metrics:latest_int64:v1", + "beam:metrics:top_n_int64:v1", + "beam:metrics:bottom_n_int64:v1": + value, err := extractGaugeValue(r) + if err != nil { + log.Println(err) + continue + } + gauges[key] = value + default: + log.Println("unknown metric type") + } + } + return counters, distributions, gauges +} + +func mergeCounters( + attempted map[metrics.StepKey]int64, + committed map[metrics.StepKey]int64) []metrics.CounterResult { + res := make([]metrics.CounterResult, 0) + + for k := range attempted { + v := committed[k] + res = append(res, metrics.CounterResult{Attempted: attempted[k], Committed: v, Key: k}) + } + return res +} + +func mergeDistributions( + attempted map[metrics.StepKey]metrics.DistributionValue, + committed map[metrics.StepKey]metrics.DistributionValue) []metrics.DistributionResult { + res := make([]metrics.DistributionResult, 0) + + for k := range attempted { + v := committed[k] + res = append(res, metrics.DistributionResult{Attempted: attempted[k], Committed: v, Key: k}) + } + return res +} + +func mergeGauges( + attempted map[metrics.StepKey]metrics.GaugeValue, + committed map[metrics.StepKey]metrics.GaugeValue) []metrics.GaugeResult { + res := make([]metrics.GaugeResult, 0) + + for k := range attempted { + v := committed[k] + res = append(res, metrics.GaugeResult{Attempted: attempted[k], Committed: v, Key: k}) + } + return res +} + +func extractKey(mi *pipepb.MonitoringInfo) (metrics.StepKey, error) { + labels := newLabels(mi.GetLabels()) + stepName := labels.Transform() + if stepName == "" { + return metrics.StepKey{}, fmt.Errorf("Failed to deduce Step from MonitoringInfo: %v", mi) + } + return metrics.StepKey{Step: stepName, Name: labels.Name(), Namespace: labels.Namespace()}, nil +} + +func extractCounterValue(reader *bytes.Reader) (int64, error) { + value, err := coder.DecodeVarInt(reader) + if err != nil { + return -1, err + } + return value, nil +} + +func extractDistributionValue(reader *bytes.Reader) (metrics.DistributionValue, error) { + values, err := decodeMany(reader, 4) + if err != nil { + return metrics.DistributionValue{}, err + } + return metrics.DistributionValue{Count: values[0], Sum: values[1], Min: values[2], Max: values[3]}, nil +} + +func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) { + values, err := decodeMany(reader, 2) + if err != nil { + return metrics.GaugeValue{}, err + } + return metrics.GaugeValue{Timestamp: time.Unix(0, values[0]*int64(time.Millisecond)), Value: values[1]}, nil +} + +func newLabels(miLabels map[string]string) *metrics.Labels { + labels := metrics.UserLabels(miLabels["PTRANSFORM"], miLabels["NAMESPACE"], miLabels["NAME"]) + return &labels +} + +func decodeMany(reader *bytes.Reader, size int) ([]int64, error) { + var err error + values := make([]int64, size) + + for i := 0; i < size; i++ { + values[i], err = coder.DecodeVarInt(reader) + if err != nil { + return nil, err + } + } + return values, err +} diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go new file mode 100644 index 0000000..83add87 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsx + +import ( + "testing" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "github.com/google/go-cmp/cmp" +) + +func TestFromMonitoringInfos_Counters(t *testing.T) { + var value int64 = 15 + want := metrics.CounterResult{ + Attempted: 15, + Committed: 0, + Key: metrics.StepKey{ + Step: "main.customDoFn", + Name: "customCounter", + Namespace: "customDoFn", + }} + + payload, err := Int64Counter(value) + if err != nil { + t.Fatalf("Failed to encode Int64Counter: %v", err) + } + + labels := map[string]string{ + "PTRANSFORM": "main.customDoFn", + "NAMESPACE": "customDoFn", + "NAME": "customCounter", + } + + mInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(UrnUserSumInt64), + Type: UrnToType(UrnUserSumInt64), + Labels: labels, + Payload: payload, + } + + attempted := []*pipepb.MonitoringInfo{mInfo} + committed := []*pipepb.MonitoringInfo{} + + got := FromMonitoringInfos(attempted, committed).AllMetrics().Counters() + size := len(got) + if size < 1 { + t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) + } + if d := cmp.Diff(want, got[0]); d != "" { + t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v", + got[0], want, d) + } +} + +func TestFromMonitoringInfos_Distributions(t *testing.T) { + var count, sum, min, max int64 = 100, 5, -12, 30 + + want := metrics.DistributionResult{ + Attempted: metrics.DistributionValue{ + Count: 100, + Sum: 5, + Min: -12, + Max: 30, + }, + Committed: metrics.DistributionValue{}, + Key: metrics.StepKey{ + Step: "main.customDoFn", + Name: "customDist", + Namespace: "customDoFn", + }} + + payload, err := Int64Distribution(count, sum, min, max) + if err != nil { + t.Fatalf("Failed to encode Int64Distribution: %v", err) + } + + labels := map[string]string{ + "PTRANSFORM": "main.customDoFn", + "NAMESPACE": "customDoFn", + "NAME": "customDist", + } + + mInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(UrnUserDistInt64), + Type: UrnToType(UrnUserDistInt64), + Labels: labels, + Payload: payload, + } + + attempted := []*pipepb.MonitoringInfo{mInfo} + committed := []*pipepb.MonitoringInfo{} + + got := FromMonitoringInfos(attempted, committed).AllMetrics().Distributions() + size := len(got) + if size < 1 { + t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) + } + if d := cmp.Diff(want, got[0]); d != "" { + t.Fatalf("Invalid distribution: got: %v, want: %v, diff(-want,+got):\n %v", + got[0], want, d) + } +} + +func TestFromMonitoringInfos_Gauges(t *testing.T) { + var value int64 = 100 + loc, _ := time.LoadLocation("Local") + tm := time.Date(2020, 11, 9, 17, 52, 28, 462*int(time.Millisecond), loc) + + want := metrics.GaugeResult{ + Attempted: metrics.GaugeValue{ + Value: 100, + Timestamp: tm, + }, + Committed: metrics.GaugeValue{}, + Key: metrics.StepKey{ + Step: "main.customDoFn", + Name: "customGauge", + Namespace: "customDoFn", + }} + + payload, err := Int64Latest(tm, value) + if err != nil { + t.Fatalf("Failed to encode Int64Latest: %v", err) + } + + labels := map[string]string{ + "PTRANSFORM": "main.customDoFn", + "NAMESPACE": "customDoFn", + "NAME": "customGauge", + } + + mInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(UrnUserLatestMsInt64), + Type: UrnToType(UrnUserLatestMsInt64), + Labels: labels, + Payload: payload, + } + + attempted := []*pipepb.MonitoringInfo{mInfo} + committed := []*pipepb.MonitoringInfo{} + + got := FromMonitoringInfos(attempted, committed).AllMetrics().Gauges() + size := len(got) + if size < 1 { + t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) + } + if d := cmp.Diff(want, got[0]); d != "" { + t.Fatalf("Invalid gauge: got: %v, want: %v, diff(-want,+got):\n %v", + got[0], want, d) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/urns.go b/sdks/go/pkg/beam/core/runtime/metricsx/urns.go new file mode 100644 index 0000000..67aed96 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/metricsx/urns.go @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricsx + +import ( + "bytes" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" +) + +// Urn is an enum type for representing urns of metrics and monitored states. +type Urn uint32 + +// TODO: Pull these from the protos. +var sUrns = [...]string{ + "beam:metric:user:sum_int64:v1", + "beam:metric:user:sum_double:v1", + "beam:metric:user:distribution_int64:v1", + "beam:metric:user:distribution_double:v1", + "beam:metric:user:latest_int64:v1", + "beam:metric:user:latest_double:v1", + "beam:metric:user:top_n_int64:v1", + "beam:metric:user:top_n_double:v1", + "beam:metric:user:bottom_n_int64:v1", + "beam:metric:user:bottom_n_double:v1", + + "beam:metric:element_count:v1", + "beam:metric:sampled_byte_size:v1", + + "beam:metric:pardo_execution_time:start_bundle_msecs:v1", + "beam:metric:pardo_execution_time:process_bundle_msecs:v1", + "beam:metric:pardo_execution_time:finish_bundle_msecs:v1", + "beam:metric:ptransform_execution_time:total_msecs:v1", + + "beam:metric:ptransform_progress:remaining:v1", + "beam:metric:ptransform_progress:completed:v1", + "beam:metric:data_channel:read_index:v1", + + "TestingSentinelUrn", // Must remain last. +} + +// The supported urns of metrics and monitored states. +const ( + UrnUserSumInt64 Urn = iota + UrnUserSumFloat64 + UrnUserDistInt64 + UrnUserDistFloat64 + UrnUserLatestMsInt64 + UrnUserLatestMsFloat64 + UrnUserTopNInt64 + UrnUserTopNFloat64 + UrnUserBottomNInt64 + UrnUserBottomNFloat64 + + UrnElementCount + UrnSampledByteSize + + UrnStartBundle + UrnProcessBundle + UrnFinishBundle + UrnTransformTotalTime + + UrnProgressRemaining + UrnProgressCompleted + UrnDataChannelReadIndex + + UrnTestSentinel // Must remain last. +) + +// UrnToString returns a string representation of the urn. +func UrnToString(u Urn) string { + return sUrns[u] +} + +// UrnToType maps the urn to it's encoding type. +// This function is written to be inlinable by the compiler. +func UrnToType(u Urn) string { + switch u { + case UrnUserSumInt64, UrnElementCount, UrnStartBundle, UrnProcessBundle, UrnFinishBundle, UrnTransformTotalTime: + return "beam:metrics:sum_int64:v1" + case UrnUserSumFloat64: + return "beam:metrics:sum_double:v1" + case UrnUserDistInt64, UrnSampledByteSize: + return "beam:metrics:distribution_int64:v1" + case UrnUserDistFloat64: + return "beam:metrics:distribution_double:v1" + case UrnUserLatestMsInt64: + return "beam:metrics:latest_int64:v1" + case UrnUserLatestMsFloat64: + return "beam:metrics:latest_double:v1" + case UrnUserTopNInt64: + return "beam:metrics:top_n_int64:v1" + case UrnUserTopNFloat64: + return "beam:metrics:top_n_double:v1" + case UrnUserBottomNInt64: + return "beam:metrics:bottom_n_int64:v1" + case UrnUserBottomNFloat64: + return "beam:metrics:bottom_n_double:v1" + + case UrnProgressRemaining, UrnProgressCompleted: + return "beam:metrics:progress:v1" + case UrnDataChannelReadIndex: + return "beam:metrics:sum_int64:v1" + + // Monitoring Table isn't currently in the protos. + // case ???: + // return "beam:metrics:monitoring_table:v1" + + case UrnTestSentinel: + return "TestingSentinelType" + + default: + panic("metric urn without specified type" + sUrns[u]) + } +} + +// Int64Counter returns an encoded payload of the integer counter. +func Int64Counter(v int64) ([]byte, error) { + var buf bytes.Buffer + if err := coder.EncodeVarInt(v, &buf); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// Int64Latest returns an encoded payload of the latest seen integer value. +func Int64Latest(t time.Time, v int64) ([]byte, error) { + var buf bytes.Buffer + if err := coder.EncodeVarInt(mtime.FromTime(t).Milliseconds(), &buf); err != nil { + return nil, err + } + if err := coder.EncodeVarInt(v, &buf); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// Int64Distribution returns an encoded payload of the distribution of an +// integer value. +func Int64Distribution(count, sum, min, max int64) ([]byte, error) { + var buf bytes.Buffer + if err := coder.EncodeVarInt(count, &buf); err != nil { + return nil, err + } + if err := coder.EncodeVarInt(sum, &buf); err != nil { + return nil, err + } + if err := coder.EncodeVarInt(min, &buf); err != nil { + return nil, err + } + if err := coder.EncodeVarInt(max, &buf); err != nil { + return nil, err + } + return buf.Bytes(), nil +} diff --git a/sdks/go/pkg/beam/doc_test.go b/sdks/go/pkg/beam/doc_test.go index 92a2b03..6c27980 100644 --- a/sdks/go/pkg/beam/doc_test.go +++ b/sdks/go/pkg/beam/doc_test.go @@ -74,7 +74,7 @@ func Example_gettingStarted() { // pipeline can be executed by a PipelineRunner. The direct runner executes the // transforms directly, sequentially, in this one process, which is useful for // unit tests and simple experiments: - if err := direct.Execute(context.Background(), p); err != nil { + if _, err := direct.Execute(context.Background(), p); err != nil { fmt.Printf("Pipeline failed: %v", err) } } diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go b/sdks/go/pkg/beam/io/textio/sdf_test.go index 05a26fa..733d79a 100644 --- a/sdks/go/pkg/beam/io/textio/sdf_test.go +++ b/sdks/go/pkg/beam/io/textio/sdf_test.go @@ -33,7 +33,7 @@ func TestReadSdf(t *testing.T) { lines := ReadSdf(s, f) passert.Count(s, lines, "NumLines", 1) - if err := beam.Run(context.Background(), "direct", p); err != nil { + if _, err := beam.Run(context.Background(), "direct", p); err != nil { t.Fatalf("Failed to execute job: %v", err) } } diff --git a/sdks/go/pkg/beam/pipeline.go b/sdks/go/pkg/beam/pipeline.go index 0c70463..26087d4 100644 --- a/sdks/go/pkg/beam/pipeline.go +++ b/sdks/go/pkg/beam/pipeline.go @@ -17,6 +17,7 @@ package beam import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" ) // Scope is a hierarchical grouping for composite transforms. Scopes can be @@ -85,3 +86,8 @@ func (p *Pipeline) Build() ([]*graph.MultiEdge, []*graph.Node, error) { func (p *Pipeline) String() string { return p.real.String() } + +// PipelineResult is the result of beamx.RunWithMetrics. +type PipelineResult interface { + Metrics() metrics.Results +} diff --git a/sdks/go/pkg/beam/runner.go b/sdks/go/pkg/beam/runner.go index 28563bd..1b6b41f 100644 --- a/sdks/go/pkg/beam/runner.go +++ b/sdks/go/pkg/beam/runner.go @@ -27,12 +27,12 @@ import ( // verification, but require that it is stored in Init and used for Run. var ( - runners = make(map[string]func(ctx context.Context, p *Pipeline) error) + runners = make(map[string]func(ctx context.Context, p *Pipeline) (PipelineResult, error)) ) // RegisterRunner associates the name with the supplied runner, making it available // to execute a pipeline via Run. -func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline) error) { +func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline) (PipelineResult, error)) { if _, ok := runners[name]; ok { panic(fmt.Sprintf("runner %v already defined", name)) } @@ -42,7 +42,7 @@ func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline) error // Run executes the pipeline using the selected registred runner. It is customary // to define a "runner" with no default as a flag to let users control runner // selection. -func Run(ctx context.Context, runner string, p *Pipeline) error { +func Run(ctx context.Context, runner string, p *Pipeline) (PipelineResult, error) { fn, ok := runners[runner] if !ok { log.Exitf(ctx, "Runner %v not registered. Forgot to _ import it?", runner) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 09b62c6..96ecde0 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -84,24 +84,24 @@ var unique int32 // Execute runs the given pipeline on Google Cloud Dataflow. It uses the // default application credentials to submit the job. -func Execute(ctx context.Context, p *beam.Pipeline) error { +func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { // (1) Gather job options project := *gcpopts.Project if project == "" { - return errors.New("no Google Cloud project specified. Use --project=<project>") + return nil, errors.New("no Google Cloud project specified. Use --project=<project>") } region := gcpopts.GetRegion(ctx) if region == "" { - return errors.New("No Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints") + return nil, errors.New("No Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints") } if *stagingLocation == "" { - return errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>") + return nil, errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>") } var jobLabels map[string]string if *labels != "" { if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil { - return errors.Wrapf(err, "error reading --label flag as JSON") + return nil, errors.Wrapf(err, "error reading --label flag as JSON") } } @@ -120,7 +120,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { } if *autoscalingAlgorithm != "" { if *autoscalingAlgorithm != "NONE" && *autoscalingAlgorithm != "THROUGHPUT_BASED" { - return errors.New("invalid autoscaling algorithm. Use --autoscaling_algorithm=(NONE|THROUGHPUT_BASED)") + return nil, errors.New("invalid autoscaling algorithm. Use --autoscaling_algorithm=(NONE|THROUGHPUT_BASED)") } } @@ -173,15 +173,15 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { edges, _, err := p.Build() if err != nil { - return err + return nil, err } enviroment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage) if err != nil { - return errors.WithContext(err, "generating model pipeline") + return nil, errors.WithContext(err, "generating model pipeline") } model, err := graphx.Marshal(edges, &graphx.Options{Environment: enviroment}) if err != nil { - return errors.WithContext(err, "generating model pipeline") + return nil, errors.WithContext(err, "generating model pipeline") } // NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar". @@ -197,14 +197,14 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { log.Info(ctx, proto.MarshalTextString(model)) job, err := dataflowlib.Translate(ctx, model, opts, workerURL, jarURL, modelURL) if err != nil { - return err + return nil, err } dataflowlib.PrintJob(ctx, job) - return nil + return nil, nil } _, err = dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, modelURL, *endpoint, *executeAsync) - return err + return nil, err } func gcsRecorderHook(opts []string) perf.CaptureHook { bucket, prefix, err := gcsx.ParseObject(opts[0]) diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go index a2c3807..5f07135 100644 --- a/sdks/go/pkg/beam/runners/direct/direct.go +++ b/sdks/go/pkg/beam/runners/direct/direct.go @@ -37,7 +37,7 @@ func init() { } // Execute runs the pipeline in-process. -func Execute(ctx context.Context, p *beam.Pipeline) error { +func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { log.Info(ctx, "Executing pipeline with the direct runner.") if !beam.Initialized() { @@ -49,33 +49,33 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { if *jobopts.Strict { log.Info(ctx, "Strict mode enabled, applying additional validation.") - if err := vet.Execute(ctx, p); err != nil { - return errors.Wrap(err, "strictness check failed") + if _, err := vet.Execute(ctx, p); err != nil { + return nil, errors.Wrap(err, "strictness check failed") } log.Info(ctx, "Strict mode validation passed.") } edges, _, err := p.Build() if err != nil { - return errors.Wrap(err, "invalid pipeline") + return nil, errors.Wrap(err, "invalid pipeline") } plan, err := Compile(edges) if err != nil { - return errors.Wrap(err, "translation failed") + return nil, errors.Wrap(err, "translation failed") } log.Info(ctx, plan) if err = plan.Execute(ctx, "", exec.DataContext{}); err != nil { plan.Down(ctx) // ignore any teardown errors - return err + return nil, err } if err = plan.Down(ctx); err != nil { - return err + return nil, err } // TODO(lostluck) 2020/01/24: What's the right way to expose the // metrics store for the direct runner? metrics.DumpToLogFromStore(ctx, plan.Store()) - return nil + return nil, nil } // Compile translates a pipeline to a multi-bundle execution plan. diff --git a/sdks/go/pkg/beam/runners/dot/dot.go b/sdks/go/pkg/beam/runners/dot/dot.go index 547aa10..9a95ece 100644 --- a/sdks/go/pkg/beam/runners/dot/dot.go +++ b/sdks/go/pkg/beam/runners/dot/dot.go @@ -37,19 +37,19 @@ func init() { var dotFile = flag.String("dot_file", "", "DOT output file to create") // Execute produces a DOT representation of the pipeline. -func Execute(ctx context.Context, p *beam.Pipeline) error { +func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { if *dotFile == "" { - return errors.New("must supply dot_file argument") + return nil, errors.New("must supply dot_file argument") } edges, nodes, err := p.Build() if err != nil { - return errors.New("can't get data to render") + return nil, errors.New("can't get data to render") } var buf bytes.Buffer if err := dotlib.Render(edges, nodes, &buf); err != nil { - return err + return nil, err } - return ioutil.WriteFile(*dotFile, buf.Bytes(), 0644) + return nil, ioutil.WriteFile(*dotFile, buf.Bytes(), 0644) } diff --git a/sdks/go/pkg/beam/runners/flink/flink.go b/sdks/go/pkg/beam/runners/flink/flink.go index 0384b98..8dd0157 100644 --- a/sdks/go/pkg/beam/runners/flink/flink.go +++ b/sdks/go/pkg/beam/runners/flink/flink.go @@ -29,6 +29,6 @@ func init() { // Execute runs the given pipeline on Flink. Convenience wrapper over the // universal runner. -func Execute(ctx context.Context, p *beam.Pipeline) error { +func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { return universal.Execute(ctx, p) } diff --git a/sdks/go/pkg/beam/runners/spark/spark.go b/sdks/go/pkg/beam/runners/spark/spark.go index c216c59..5d67200 100644 --- a/sdks/go/pkg/beam/runners/spark/spark.go +++ b/sdks/go/pkg/beam/runners/spark/spark.go @@ -29,6 +29,6 @@ func init() { // Execute runs the given pipeline on Spark. Convenience wrapper over the // universal runner. -func Execute(ctx context.Context, p *beam.Pipeline) error { +func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { return universal.Execute(ctx, p) } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index dbb3387..38d7a75 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -20,6 +20,8 @@ import ( "os" "time" + "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" @@ -29,19 +31,20 @@ import ( // Execute executes a pipeline on the universal runner serving the given endpoint. // Convenience function. -func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobOptions, async bool) (string, error) { +func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobOptions, async bool) (*universalPipelineResult, error) { // (1) Prepare job to obtain artifact staging instructions. + presult := &universalPipelineResult{JobID: ""} cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute) if err != nil { - return "", errors.WithContextf(err, "connecting to job service") + return presult, errors.WithContextf(err, "connecting to job service") } defer cc.Close() client := jobpb.NewJobServiceClient(cc) prepID, artifactEndpoint, st, err := Prepare(ctx, client, p, opt) if err != nil { - return "", err + return presult, err } log.Infof(ctx, "Prepared job with id: %v and staging token: %v", prepID, st) @@ -58,7 +61,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO worker, err := BuildTempWorkerBinary(ctx) if err != nil { - return "", err + return presult, err } defer os.Remove(worker) @@ -70,7 +73,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO token, err := Stage(ctx, prepID, artifactEndpoint, bin, st) if err != nil { - return "", err + return presult, err } log.Infof(ctx, "Staged binary artifact with token: %v", token) @@ -79,7 +82,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO jobID, err := Submit(ctx, client, prepID, token) if err != nil { - return "", err + return presult, err } log.Infof(ctx, "Submitted job: %v", jobID) @@ -87,7 +90,36 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO // (4) Wait for completion. if async { - return jobID, nil + return presult, nil } - return jobID, WaitForCompletion(ctx, client, jobID) + err = WaitForCompletion(ctx, client, jobID) + + res, err := newUniversalPipelineResult(ctx, jobID, client) + if err != nil { + return presult, err + } + presult = res + + return presult, err +} + +type universalPipelineResult struct { + JobID string + metrics *metrics.Results +} + +func newUniversalPipelineResult(ctx context.Context, jobID string, client jobpb.JobServiceClient) (*universalPipelineResult, error) { + request := &jobpb.GetJobMetricsRequest{JobId: jobID} + response, err := client.GetJobMetrics(ctx, request) + if err != nil { + return &universalPipelineResult{jobID, nil}, errors.Wrap(err, "failed to get metrics") + } + + monitoredStates := response.GetMetrics() + metrics := metricsx.FromMonitoringInfos(monitoredStates.Attempted, monitoredStates.Committed) + return &universalPipelineResult{jobID, metrics}, nil +} + +func (pr universalPipelineResult) Metrics() metrics.Results { + return *pr.metrics } diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go index de5378c..3ad1a18 100644 --- a/sdks/go/pkg/beam/runners/universal/universal.go +++ b/sdks/go/pkg/beam/runners/universal/universal.go @@ -42,27 +42,27 @@ func init() { } // Execute executes the pipeline on a universal beam runner. -func Execute(ctx context.Context, p *beam.Pipeline) error { +func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { if !beam.Initialized() { panic(fmt.Sprint("Beam has not been initialized. Call beam.Init() before pipeline construction.")) } if *jobopts.Strict { log.Info(ctx, "Strict mode enabled, applying additional validation.") - if err := vet.Execute(ctx, p); err != nil { - return errors.Wrap(err, "strictness check failed") + if _, err := vet.Execute(ctx, p); err != nil { + return nil, errors.Wrap(err, "strictness check failed") } log.Info(ctx, "Strict mode validation passed.") } endpoint, err := jobopts.GetEndpoint() if err != nil { - return err + return nil, err } edges, _, err := p.Build() if err != nil { - return err + return nil, err } envUrn := jobopts.GetEnvironmentUrn(ctx) getEnvCfg := jobopts.GetEnvironmentConfig @@ -71,7 +71,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { // TODO(BEAM-10610): Allow user configuration of this port, rather than kernel selected. srv, err := extworker.StartLoopback(ctx, 0) if err != nil { - return err + return nil, err } defer srv.Stop(ctx) getEnvCfg = srv.EnvironmentConfig @@ -79,11 +79,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { enviroment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg) if err != nil { - return errors.WithContextf(err, "generating model pipeline") + return nil, errors.WithContextf(err, "generating model pipeline") } pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: enviroment}) if err != nil { - return errors.WithContextf(err, "generating model pipeline") + return nil, errors.WithContextf(err, "generating model pipeline") } // Fetch all dependencies for cross-language transforms @@ -97,6 +97,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { Worker: *jobopts.WorkerBinary, RetainDocker: *jobopts.RetainDockerContainers, } - _, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async) - return err + presult, err := runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async) + return presult, err } diff --git a/sdks/go/pkg/beam/runners/vet/vet.go b/sdks/go/pkg/beam/runners/vet/vet.go index 4ce0e2f..268852a 100644 --- a/sdks/go/pkg/beam/runners/vet/vet.go +++ b/sdks/go/pkg/beam/runners/vet/vet.go @@ -58,10 +58,10 @@ func (p disabledResolver) Sym2Addr(name string) (uintptr, error) { } // Execute evaluates the pipeline on whether it can run without reflection. -func Execute(ctx context.Context, p *beam.Pipeline) error { +func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { e, err := Evaluate(ctx, p) if err != nil { - return errors.WithContext(err, "validating pipeline with vet runner") + return nil, errors.WithContext(err, "validating pipeline with vet runner") } if !e.Performant() { e.summary() @@ -69,10 +69,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { e.diag("*/\n") err := errors.Errorf("pipeline is not performant, see diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes())) err = errors.WithContext(err, "validating pipeline with vet runner") - return errors.SetTopLevelMsg(err, "pipeline is not performant") + return nil, errors.SetTopLevelMsg(err, "pipeline is not performant") } // Pipeline nas no further tasks. - return nil + return nil, nil } // Evaluate returns an object that can generate necessary shims and inits. diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go b/sdks/go/pkg/beam/testing/ptest/ptest.go index fe23f6c..602303a 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest.go @@ -73,7 +73,8 @@ func Run(p *beam.Pipeline) error { if *Runner == "" { *Runner = defaultRunner } - return beam.Run(context.Background(), *Runner, p) + _, err := beam.Run(context.Background(), *Runner, p) + return err } // Main is an implementation of testing's TestMain to permit testing diff --git a/sdks/go/pkg/beam/x/beamx/run.go b/sdks/go/pkg/beam/x/beamx/run.go index 9a1d27a..77d766b 100644 --- a/sdks/go/pkg/beam/x/beamx/run.go +++ b/sdks/go/pkg/beam/x/beamx/run.go @@ -40,5 +40,13 @@ var runner = flag.String("runner", "direct", "Pipeline runner.") // defaults to the direct runner, but all beam-distributed runners and textio // filesystems are implicitly registered. func Run(ctx context.Context, p *beam.Pipeline) error { + _, err := beam.Run(ctx, *runner, p) + return err +} + +// RunWithMetrics invokes beam.Run with the runner supplied by the +// flag "runner". Returns a beam.PipelineResult objects, which can be +// accessed to query the pipeline's metrics. +func RunWithMetrics(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) { return beam.Run(ctx, *runner, p) }