[ 
https://issues.apache.org/jira/browse/BEAM-11207?focusedWorklogId=509743&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-509743
 ]

ASF GitHub Bot logged work on BEAM-11207:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Nov/20 14:53
            Start Date: 10/Nov/20 14:53
    Worklog Time Spent: 10m 
      Work Description: kamilwu commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r520623472



##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64) 
([]byte, error) {
        }
        return buf.Bytes(), nil
 }
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) (
+       []metrics.CounterResult,
+       []metrics.DistributionResult,
+       []metrics.GaugeResult) {
+       ac, ad, ag := groupByType(attempted)
+       cc, cd, cg := groupByType(committed)
+
+       c := mergeCounters(ac, cc)
+       d := mergeDistributions(ad, cd)
+       g := mergeGauges(ag, cg)
+
+       return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+       switch mi.GetType() {
+       case
+               "beam:metrics:latest_int64:v1",
+               "beam:metrics:top_n_int64:v1",
+               "beam:metrics:bottom_n_int64:v1":
+               return true
+       }
+       return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+       map[metrics.MetricKey]int64,
+       map[metrics.MetricKey]metrics.DistributionValue,
+       map[metrics.MetricKey]metrics.GaugeValue) {
+       counters := make(map[metrics.MetricKey]int64)
+       distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+       gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+       for _, minfo := range minfos {
+               key, err := extractKey(minfo)
+               if err != nil {
+                       log.Println(err)
+                       continue
+               }
+
+               r := bytes.NewReader(minfo.GetPayload())
+
+               if IsCounter(minfo) {
+                       value, err := extractCounterValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       counters[key] = value
+               } else if IsDistribution(minfo) {
+                       value, err := extractDistributionValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       distributions[key] = value
+               } else if IsGauge(minfo) {
+                       value, err := extractGaugeValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       gauges[key] = value
+               } else {
+                       log.Println("unknown metric type")
+               }
+       }
+       return counters, distributions, gauges
+}
+
+func mergeCounters(
+       attempted map[metrics.MetricKey]int64,
+       committed map[metrics.MetricKey]int64) []metrics.CounterResult {
+       res := make([]metrics.CounterResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = -1
+               }
+               res = append(res, metrics.CounterResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeDistributions(
+       attempted map[metrics.MetricKey]metrics.DistributionValue,
+       committed map[metrics.MetricKey]metrics.DistributionValue) 
[]metrics.DistributionResult {
+       res := make([]metrics.DistributionResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.DistributionValue{}
+               }
+               res = append(res, metrics.DistributionResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeGauges(
+       attempted map[metrics.MetricKey]metrics.GaugeValue,
+       committed map[metrics.MetricKey]metrics.GaugeValue) 
[]metrics.GaugeResult {
+       res := make([]metrics.GaugeResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.GaugeValue{}
+               }
+               res = append(res, metrics.GaugeResult{Attempted: attempted[k], 
Committed: v, Key: k})
+       }
+       return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.MetricKey, error) {
+       labels := newLabels(mi.GetLabels())
+       stepName := getStepName(labels)
+       if stepName == "" {
+               return metrics.MetricKey{}, fmt.Errorf("Failed to deduce Step 
from MonitoringInfo: %v", mi)
+       }
+       return metrics.MetricKey{Step: stepName, Name: labels.Name(), 
Namespace: labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+       value, err := coder.DecodeVarInt(reader)
+       if err != nil {
+               return -1, err
+       }
+       return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader) 
(metrics.DistributionValue, error) {
+       values, err := decodeMany(reader, 4)
+       if err != nil {
+               return metrics.DistributionValue{}, err
+       }
+       return metrics.DistributionValue{Count: values[0], Sum: values[1], Min: 
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+       values, err := decodeMany(reader, 2)
+       if err != nil {
+               return metrics.GaugeValue{}, err
+       }
+       return metrics.GaugeValue{Timestamp: time.Unix(0, 
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+       labels := metrics.UserLabels(
+               miLabels["PTRANSFORM"],
+               miLabels[pipepb.MonitoringInfo_NAMESPACE.String()],

Review comment:
       `pipepb.MonitoringInfo_PTRANSFORM.String()` keeps returning TRANSFORM, 
without the "P" prefix. I'm not sure why is that, TRANSFORM is invalid in this 
context and we should use PTRANSFORM. I changed it so that only the constants 
are used.
   
   Just out of my curiosity, why the generated .pb.go files are committed into 
a git repository? for example:  
https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/model/pipeline_v1. 
Is there some kind of problem with generating those files locally?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 509743)
    Time Spent: 1h 50m  (was: 1h 40m)

> [Go SDK] Support metrics querying
> ---------------------------------
>
>                 Key: BEAM-11207
>                 URL: https://issues.apache.org/jira/browse/BEAM-11207
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Kamil Wasilewski
>            Assignee: Kamil Wasilewski
>            Priority: P2
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Design doc: [https://s.apache.org/get-metrics-api]
> Go SDK does not offer a way to query a pipeline's metrics. Go SDK needs to 
> implement a code that would call the GetJobMetrics RPC and let users query 
> the result by using an API similar to existing APIs in Python and Java SDKs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to