This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cd76f28 move MonitoringInfo protos to model/pipeline module
new 7206b9b Merge pull request #7938: [BEAM-4775] move MonitoringInfo
protos to model/pipeline module
cd76f28 is described below
commit cd76f28611670653c917e406306ce8870df0a073
Author: Ryan Williams <[email protected]>
AuthorDate: Sat Feb 23 06:33:34 2019 +0000
move MonitoringInfo protos to model/pipeline module
---
.../fn-execution/src/main/proto/beam_fn_api.proto | 296 +------------------
.../src/main/proto/beam_job_api.proto | 1 -
model/pipeline/src/main/proto/metrics.proto | 324 +++++++++++++++++++++
.../beam/runners/core/metrics/MetricUrns.java | 2 +-
.../runners/core/metrics/MetricsContainerImpl.java | 2 +-
.../core/metrics/MetricsContainerStepMap.java | 2 +-
.../core/metrics/SimpleMonitoringInfoBuilder.java | 22 +-
.../runners/core/metrics/SimpleStateRegistry.java | 2 +-
.../core/metrics/SpecMonitoringInfoValidator.java | 12 +-
.../core/metrics/MetricsContainerImplTest.java | 2 +-
.../core/metrics/MetricsContainerStepMapTest.java | 2 +-
.../core/metrics/MonitoringInfoMatchers.java | 2 +-
.../core/metrics/MonitoringInfoTestUtil.java | 2 +-
.../metrics/SimpleMonitoringInfoBuilderTest.java | 2 +-
.../core/metrics/SimpleStateRegistryTest.java | 2 +-
.../metrics/SpecMonitoringInfoValidatorTest.java | 2 +-
.../flink/metrics/FlinkMetricContainer.java | 12 +-
.../flink/metrics/FlinkMetricContainerTest.java | 20 +-
.../worker/fn/control/BeamFnMapTaskExecutor.java | 2 +-
...piMonitoringInfoToCounterUpdateTransformer.java | 2 +-
...ecMonitoringInfoToCounterUpdateTransformer.java | 2 +-
.../MonitoringInfoToCounterUpdateTransformer.java | 2 +-
.../control/RegisterAndProcessBundleOperation.java | 2 +-
.../dataflow/worker/fn/control/TimerReceiver.java | 8 +-
...erMonitoringInfoToCounterUpdateTransformer.java | 13 +-
.../fn/control/BeamFnMapTaskExecutorTest.java | 95 +++---
...nitoringInfoToCounterUpdateTransformerTest.java | 2 +-
...nitoringInfoToCounterUpdateTransformerTest.java | 2 +-
...nitoringInfoToCounterUpdateTransformerTest.java | 2 +-
.../fnexecution/control/RemoteExecutionTest.java | 2 +-
.../fn/harness/control/ProcessBundleHandler.java | 2 +-
.../harness/data/PCollectionConsumerRegistry.java | 2 +-
.../harness/data/PTransformFunctionRegistry.java | 2 +-
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 2 +-
.../data/ElementCountFnDataReceiverTest.java | 2 +-
sdks/python/apache_beam/metrics/cells.py | 15 +-
.../python/apache_beam/metrics/monitoring_infos.py | 6 +-
.../python/apache_beam/options/pipeline_options.py | 1 +
sdks/python/apache_beam/portability/common_urns.py | 6 +-
39 files changed, 452 insertions(+), 429 deletions(-)
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 7ab693f..e96a8cc 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -43,6 +43,7 @@ import "endpoints.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
+import "metrics.proto";
/*
* Constructs that define the pipeline shape.
@@ -254,7 +255,7 @@ message BundleApplication {
// will use consume when providing a UI or for making scaling and performance
// decisions. See https://s.apache.org/beam-bundles-backlog-splitting for
// details about what types of signals may be useful to report.
- repeated MonitoringInfo monitoring_infos = 7;
+ repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos =
7;
}
// An Application should be scheduled for execution after a delay.
@@ -293,7 +294,7 @@ message ProcessBundleResponse {
// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
- repeated MonitoringInfo monitoring_infos = 3;
+ repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos =
3;
// (Optional) Specifies that the runner must callback to this worker
// once the output of the bundle is committed. The Runner must send a
@@ -311,295 +312,6 @@ message ProcessBundleProgressRequest {
string instruction_reference = 1;
}
-// A specification containing required set of fields and labels required
-// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
-// ProcessBundleResponse reporting.
-message MonitoringInfoSpec {
- string urn = 1;
- string type_urn = 2;
- // The list of required
- repeated string required_labels = 3;
- // Extra non functional parts of the spec for descriptive purposes.
- // i.e. description, units, etc.
- repeated Annotation annotations = 4;
-}
-
-// The key name and value string of MonitoringInfo annotations.
-message Annotation {
- string key = 1;
- string value = 2;
-}
-
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
-message MonitoringInfoSpecs {
- enum Enum {
- // TODO(ajamato): Add the PTRANSFORM name as a required label after
- // upgrading the python SDK.
- USER_COUNTER = 0 [(monitoring_info_spec) = {
- urn: "beam:metric:user:",
- type_urn: "beam:metrics:sum_int_64",
- }];
-
- ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
- urn: "beam:metric:element_count:v1",
- type_urn: "beam:metrics:sum_int_64",
- required_labels: [ "PCOLLECTION" ],
- annotations: [ {
- key: "description",
- value: "The total elements output to a Pcollection by a PTransform."
- } ]
- }];
-
- START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
- urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
- type_urn: "beam:metrics:sum_int_64",
- required_labels: [ "PTRANSFORM" ],
- annotations: [ {
- key: "description",
- value: "The total estimated execution time of the start bundle"
- "function in a pardo"
- } ]
- }];
-
- PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = {
- urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
- type_urn: "beam:metrics:sum_int_64",
- required_labels: [ "PTRANSFORM" ],
- annotations: [ {
- key: "description",
- value: "The total estimated execution time of the process bundle"
- "function in a pardo"
- } ]
- }];
-
- FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = {
- urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
- type_urn: "beam:metrics:sum_int_64",
- required_labels: [ "PTRANSFORM" ],
- annotations: [ {
- key: "description",
- value: "The total estimated execution time of the finish bundle "
- "function in a pardo"
- } ]
- }];
-
- TOTAL_MSECS = 5 [(monitoring_info_spec) = {
- urn: "beam:metric:ptransform_execution_time:total_msecs:v1",
- type_urn: "beam:metrics:sum_int_64",
- required_labels: [ "PTRANSFORM" ],
- annotations: [ {
- key: "description",
- value: "The total estimated execution time of the ptransform"
- } ]
- }];
- }
-}
-
-// A set of properties for the MonitoringInfoLabel, this is useful to obtain
-// the proper label string for the MonitoringInfoLabel.
-message MonitoringInfoLabelProps {
- // The label key to use in the MonitoringInfo labels map.
- string name = 1;
-}
-
-// Enum extension to store MonitoringInfo related
-// specifications, constants, etc.
-extend google.protobuf.EnumValueOptions {
- MonitoringInfoLabelProps label_props = 127337796; // From: commit 0x7970544.
-
- // Enum extension to store the MonitoringInfoSpecs.
- MonitoringInfoSpec monitoring_info_spec = 207174266;
-}
-
-message MonitoringInfo {
- // The name defining the metric or monitored state.
- string urn = 1;
-
- // This is specified as a URN that implies:
- // A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
- // Sub types like field formats - int64, double, string.
- // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
- // valid values are:
- // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
- // sum_double|latest_double|top_n_double|bottom_n_double|
- // distribution_int_64|distribution_double|monitoring_data_table
- string type = 2;
-
- // The Metric or monitored state.
- oneof data {
- MonitoringTableData monitoring_table_data = 3;
- Metric metric = 4;
- }
-
- enum MonitoringInfoLabels {
- // TODO(ajamato): Rename all references to TRANSFORM to PTRANSFORM
- TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }];
- PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }];
- WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
- CODER = 3 [(label_props) = { name: "CODER" }];
- ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
- }
- // A set of key+value labels which define the scope of the metric.
- // Either a well defined entity id for matching the enum names in
- // the MonitoringInfoLabels enum or any arbitrary label
- // set by a custom metric or user metric.
- // A monitoring system is expected to be able to aggregate the metrics
- // together for all updates having the same URN and labels. Some systems such
- // as Stackdriver will be able to aggregate the metrics using a subset of the
- // provided labels
- map<string, string> labels = 5;
-
- // The walltime of the most recent update.
- // Useful for aggregation for latest types such as LatestInt64.
- google.protobuf.Timestamp timestamp = 6;
-}
-
-message MonitoringInfoUrns {
- enum Enum {
- // User counter have this format: 'beam:metric:user:<namespace>:<name>'.
- USER_COUNTER_URN_PREFIX = 0
- [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metric:user:"];
-
- ELEMENT_COUNT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metric:element_count:v1"];
-
- START_BUNDLE_MSECS = 2
- [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metric:pardo_execution_time:start_bundle_msecs:v1"];
-
- PROCESS_BUNDLE_MSECS = 3
- [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metric:pardo_execution_time:process_bundle_msecs:v1"];
-
- FINISH_BUNDLE_MSECS = 4
- [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"];
-
- TOTAL_MSECS = 5
- [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metric:ptransform_execution_time:total_msecs:v1"];
- }
-}
-
-message MonitoringInfoTypeUrns {
- enum Enum {
- SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metrics:sum_int_64"];
-
- DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metrics:distribution_int_64"];
-
- LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
- "beam:metrics:latest_int_64"];
- }
-}
-
-message Metric {
- // (Required) The data for this metric.
- oneof data {
- CounterData counter_data = 1;
- DistributionData distribution_data = 2;
- ExtremaData extrema_data = 3;
- }
-}
-
-// Data associated with a Counter or Gauge metric.
-// This is designed to be compatible with metric collection
-// systems such as DropWizard.
-message CounterData {
- oneof value {
- int64 int64_value = 1;
- double double_value = 2;
- string string_value = 3;
- }
-}
-
-// Extrema messages are used for calculating
-// Top-N/Bottom-N metrics.
-message ExtremaData {
- oneof extrema {
- IntExtremaData int_extrema_data = 1;
- DoubleExtremaData double_extrema_data = 2;
- }
-}
-
-message IntExtremaData {
- repeated int64 int_values = 1;
-}
-
-message DoubleExtremaData {
- repeated double double_values = 2;
-}
-
-// Data associated with a distribution metric.
-// This is based off of the current DistributionData metric.
-// This is not a stackdriver or dropwizard compatible
-// style of distribution metric.
-message DistributionData {
- oneof distribution {
- IntDistributionData int_distribution_data = 1;
- DoubleDistributionData double_distribution_data = 2;
- }
-}
-
-message IntDistributionData {
- int64 count = 1;
- int64 sum = 2;
- int64 min = 3;
- int64 max = 4;
-}
-
-message DoubleDistributionData {
- int64 count = 1;
- double sum = 2;
- double min = 3;
- double max = 4;
-}
-
-// General MonitoredState information which contains
-// structured information which does not fit into a typical
-// metric format. For example, a table of important files
-// and metadata which an I/O source is reading.
-// Note: Since MonitoredState is designed to be
-// customizable, and allow engines to aggregate these
-// metrics in custom ways.
-// Engines can use custom aggregation functions for specific URNs
-// by inspecting the column values.
-// An SDK should always report its current state, that is all
-// relevant MonitoredState for its PTransform at the current moment
-// and this should be kept small.
-// For example, an SDK can emit the oldest three files which
-// have been waiting for data for over 1 hour.
-// If an engine supports the URN with a custom aggregation then
-// it can filter these and keep only the Top-3 rows based on
-// how long the files have been waiting for data.
-// Otherwise an engine can ignore the MonitoringTableData
-// or union all the rows together into one large table and display
-// them in a UI.
-message MonitoringTableData {
- message MonitoringColumnValue {
- oneof value {
- int64 int64_value = 1;
- double double_value = 2;
- string string_value = 3;
- google.protobuf.Timestamp timestamp = 4;
- }
- }
-
- message MonitoringRow {
- repeated MonitoringColumnValue values = 1;
- }
-
- // The number of column names must match the
- // number of values in each MonitoringRow.
- repeated string column_names = 1;
- repeated MonitoringRow row_data = 2;
-}
-
// DEPRECATED
message Metrics {
// PTransform level metrics.
@@ -723,7 +435,7 @@ message ProcessBundleProgressResponse {
// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
- repeated MonitoringInfo monitoring_infos = 3;
+ repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos =
3;
// The list of currently active primary roots that are being
// executed. Required to be populated for PTransforms which can be split.
diff --git a/model/job-management/src/main/proto/beam_job_api.proto
b/model/job-management/src/main/proto/beam_job_api.proto
index cd2ddff..4a0ecea 100644
--- a/model/job-management/src/main/proto/beam_job_api.proto
+++ b/model/job-management/src/main/proto/beam_job_api.proto
@@ -33,7 +33,6 @@ import "beam_runner_api.proto";
import "endpoints.proto";
import "google/protobuf/struct.proto";
-
// Job Service for running RunnerAPI pipelines
service JobService {
// Prepare a job for execution. The job will not be executed until a call is
made to run with the
diff --git a/model/pipeline/src/main/proto/metrics.proto
b/model/pipeline/src/main/proto/metrics.proto
new file mode 100644
index 0000000..822c992
--- /dev/null
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers for metrics classes, used in the Fn API, Job API, and by
SDKs.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = "pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "MetricsApi";
+
+
+import "beam_runner_api.proto";
+import "google/protobuf/descriptor.proto";
+import "google/protobuf/timestamp.proto";
+
+// A specification containing required set of fields and labels required
+// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
+// ProcessBundleResponse reporting.
+message MonitoringInfoSpec {
+ string urn = 1;
+ string type_urn = 2;
+ // The list of required
+ repeated string required_labels = 3;
+ // Extra non functional parts of the spec for descriptive purposes.
+ // i.e. description, units, etc.
+ repeated Annotation annotations = 4;
+}
+
+// The key name and value string of MonitoringInfo annotations.
+message Annotation {
+ string key = 1;
+ string value = 2;
+}
+
+// Populated MonitoringInfoSpecs for specific URNs.
+// Indicating the required fields to be set.
+// SDKs and RunnerHarnesses can load these instances into memory and write a
+// validator or code generator to assist with populating and validating
+// MonitoringInfo protos.
+message MonitoringInfoSpecs {
+ enum Enum {
+ // TODO(ajamato): Add the PTRANSFORM name as a required label after
+ // upgrading the python SDK.
+ USER_COUNTER = 0 [(monitoring_info_spec) = {
+ urn: "beam:metric:user:",
+ type_urn: "beam:metrics:sum_int_64",
+ }];
+
+ ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+ urn: "beam:metric:element_count:v1",
+ type_urn: "beam:metrics:sum_int_64",
+ required_labels: [ "PCOLLECTION" ],
+ annotations: [ {
+ key: "description",
+ value: "The total elements output to a Pcollection by a PTransform."
+ } ]
+ }];
+
+ START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
+ urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+ type_urn: "beam:metrics:sum_int_64",
+ required_labels: [ "PTRANSFORM" ],
+ annotations: [ {
+ key: "description",
+ value: "The total estimated execution time of the start bundle"
+ "function in a pardo"
+ } ]
+ }];
+
+ PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = {
+ urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+ type_urn: "beam:metrics:sum_int_64",
+ required_labels: [ "PTRANSFORM" ],
+ annotations: [ {
+ key: "description",
+ value: "The total estimated execution time of the process bundle"
+ "function in a pardo"
+ } ]
+ }];
+
+ FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = {
+ urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+ type_urn: "beam:metrics:sum_int_64",
+ required_labels: [ "PTRANSFORM" ],
+ annotations: [ {
+ key: "description",
+ value: "The total estimated execution time of the finish bundle "
+ "function in a pardo"
+ } ]
+ }];
+
+ TOTAL_MSECS = 5 [(monitoring_info_spec) = {
+ urn: "beam:metric:ptransform_execution_time:total_msecs:v1",
+ type_urn: "beam:metrics:sum_int_64",
+ required_labels: [ "PTRANSFORM" ],
+ annotations: [ {
+ key: "description",
+ value: "The total estimated execution time of the ptransform"
+ } ]
+ }];
+ }
+}
+
+// A set of properties for the MonitoringInfoLabel, this is useful to obtain
+// the proper label string for the MonitoringInfoLabel.
+message MonitoringInfoLabelProps {
+ // The label key to use in the MonitoringInfo labels map.
+ string name = 1;
+}
+
+// Enum extension to store MonitoringInfo related
+// specifications, constants, etc.
+extend google.protobuf.EnumValueOptions {
+ MonitoringInfoLabelProps label_props = 127337796; // From: commit 0x7970544.
+
+ // Enum extension to store the MonitoringInfoSpecs.
+ MonitoringInfoSpec monitoring_info_spec = 207174266;
+}
+
+message MonitoringInfo {
+ // The name defining the metric or monitored state.
+ string urn = 1;
+
+ // This is specified as a URN that implies:
+ // A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
+ // Sub types like field formats - int64, double, string.
+ // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
+ // valid values are:
+ // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
+ // sum_double|latest_double|top_n_double|bottom_n_double|
+ // distribution_int_64|distribution_double|monitoring_data_table
+ string type = 2;
+
+ // The Metric or monitored state.
+ oneof data {
+ MonitoringTableData monitoring_table_data = 3;
+ Metric metric = 4;
+ }
+
+ enum MonitoringInfoLabels {
+ // TODO(ajamato): Rename all references to TRANSFORM to PTRANSFORM
+ TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }];
+ PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }];
+ WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
+ CODER = 3 [(label_props) = { name: "CODER" }];
+ ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
+ }
+ // A set of key+value labels which define the scope of the metric.
+ // Either a well defined entity id for matching the enum names in
+ // the MonitoringInfoLabels enum or any arbitrary label
+ // set by a custom metric or user metric.
+ // A monitoring system is expected to be able to aggregate the metrics
+ // together for all updates having the same URN and labels. Some systems such
+ // as Stackdriver will be able to aggregate the metrics using a subset of the
+ // provided labels
+ map<string, string> labels = 5;
+
+ // The walltime of the most recent update.
+ // Useful for aggregation for latest types such as LatestInt64.
+ google.protobuf.Timestamp timestamp = 6;
+}
+
+message MonitoringInfoUrns {
+ enum Enum {
+ // User counter have this format: 'beam:metric:user:<namespace>:<name>'.
+ USER_COUNTER_URN_PREFIX = 0
+ [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metric:user:"];
+
+ ELEMENT_COUNT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metric:element_count:v1"];
+
+ START_BUNDLE_MSECS = 2
+ [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metric:pardo_execution_time:start_bundle_msecs:v1"];
+
+ PROCESS_BUNDLE_MSECS = 3
+ [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metric:pardo_execution_time:process_bundle_msecs:v1"];
+
+ FINISH_BUNDLE_MSECS = 4
+ [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"];
+
+ TOTAL_MSECS = 5
+ [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metric:ptransform_execution_time:total_msecs:v1"];
+ }
+}
+
+message MonitoringInfoTypeUrns {
+ enum Enum {
+ SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metrics:sum_int_64"];
+
+ DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metrics:distribution_int_64"];
+
+ LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:metrics:latest_int_64"];
+ }
+}
+
+message Metric {
+ // (Required) The data for this metric.
+ oneof data {
+ CounterData counter_data = 1;
+ DistributionData distribution_data = 2;
+ ExtremaData extrema_data = 3;
+ }
+}
+
+// Data associated with a Counter or Gauge metric.
+// This is designed to be compatible with metric collection
+// systems such as DropWizard.
+message CounterData {
+ oneof value {
+ int64 int64_value = 1;
+ double double_value = 2;
+ string string_value = 3;
+ }
+}
+
+// Extrema messages are used for calculating
+// Top-N/Bottom-N metrics.
+message ExtremaData {
+ oneof extrema {
+ IntExtremaData int_extrema_data = 1;
+ DoubleExtremaData double_extrema_data = 2;
+ }
+}
+
+message IntExtremaData {
+ repeated int64 int_values = 1;
+}
+
+message DoubleExtremaData {
+ repeated double double_values = 2;
+}
+
+// Data associated with a distribution metric.
+// This is based off of the current DistributionData metric.
+// This is not a stackdriver or dropwizard compatible
+// style of distribution metric.
+message DistributionData {
+ oneof distribution {
+ IntDistributionData int_distribution_data = 1;
+ DoubleDistributionData double_distribution_data = 2;
+ }
+}
+
+message IntDistributionData {
+ int64 count = 1;
+ int64 sum = 2;
+ int64 min = 3;
+ int64 max = 4;
+}
+
+message DoubleDistributionData {
+ int64 count = 1;
+ double sum = 2;
+ double min = 3;
+ double max = 4;
+}
+
+// General MonitoredState information which contains
+// structured information which does not fit into a typical
+// metric format. For example, a table of important files
+// and metadata which an I/O source is reading.
+// Note: Since MonitoredState is designed to be
+// customizable, and allow engines to aggregate these
+// metrics in custom ways.
+// Engines can use custom aggregation functions for specific URNs
+// by inspecting the column values.
+// An SDK should always report its current state, that is all
+// relevant MonitoredState for its PTransform at the current moment
+// and this should be kept small.
+// For example, an SDK can emit the oldest three files which
+// have been waiting for data for over 1 hour.
+// If an engine supports the URN with a custom aggregation then
+// it can filter these and keep only the Top-3 rows based on
+// how long the files have been waiting for data.
+// Otherwise an engine can ignore the MonitoringTableData
+// or union all the rows together into one large table and display
+// them in a UI.
+message MonitoringTableData {
+ message MonitoringColumnValue {
+ oneof value {
+ int64 int64_value = 1;
+ double double_value = 2;
+ string string_value = 3;
+ google.protobuf.Timestamp timestamp = 4;
+ }
+ }
+
+ message MonitoringRow {
+ repeated MonitoringColumnValue values = 1;
+ }
+
+ // The number of column names must match the
+ // number of values in each MonitoringRow.
+ repeated string column_names = 1;
+ repeated MonitoringRow row_data = 2;
+}
+
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
index 6b40603..ba47812 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.metrics.MetricName;
public class MetricUrns {
/**
* Parse a {@link MetricName} from a {@link
- * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}.
+ * org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoUrns.Enum}.
*
* <p>Should be consistent with {@code parse_namespace_and_name} in
monitoring_infos.py.
*/
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 3f81827..68fd86d 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.annotations.Experimental;
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
index 6480a9d..1e46ebb 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.metrics.MetricResult;
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index c39f15d..92893d9 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -17,17 +17,19 @@
*/
package org.apache.beam.runners.core.metrics;
+import static org.apache.beam.model.pipeline.v1.MetricsApi.labelProps;
+import static org.apache.beam.model.pipeline.v1.MetricsApi.monitoringInfoSpec;
+
import java.time.Instant;
import java.util.HashMap;
import javax.annotation.Nullable;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import
org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo.MonitoringInfoLabels;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoLabelProps;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoTypeUrns;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import
org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoLabelProps;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpec;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoTypeUrns;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoUrns;
import org.apache.beam.runners.core.construction.BeamUrns;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -91,7 +93,7 @@ public class SimpleMonitoringInfoBuilder {
// the proto files.
if (!val.name().equals("UNRECOGNIZED")) {
MonitoringInfoSpec spec =
-
val.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec);
+
val.getValueDescriptor().getOptions().getExtension(monitoringInfoSpec);
SimpleMonitoringInfoBuilder.specs.put(spec.getUrn(), spec);
}
}
@@ -100,7 +102,7 @@ public class SimpleMonitoringInfoBuilder {
/** Returns the label string constant defined in the MonitoringInfoLabel
enum proto. */
private static String getLabelString(MonitoringInfoLabels label) {
MonitoringInfoLabelProps props =
-
label.getValueDescriptor().getOptions().getExtension(BeamFnApi.labelProps);
+ label.getValueDescriptor().getOptions().getExtension(labelProps);
return props.getName();
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java
index 224d6e2..19f2791 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.core.metrics;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
/**
* A Class for registering SimpleExecutionStates with and extracting execution
time MonitoringInfos.
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
index b0acbc8..8ac8bd6 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
@@ -17,14 +17,15 @@
*/
package org.apache.beam.runners.core.metrics;
+import static org.apache.beam.model.pipeline.v1.MetricsApi.monitoringInfoSpec;
+
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpec;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
/** Class implements validation of MonitoringInfos against
MonitoringInfoSpecs. */
public class SpecMonitoringInfoValidator {
@@ -35,8 +36,7 @@ public class SpecMonitoringInfoValidator {
Arrays.stream(MonitoringInfoSpecs.Enum.values())
// Filtering default value for "unknown" Enums. Coming from proto
implementation.
.filter(x -> !x.name().equals("UNRECOGNIZED"))
- .map(
- x ->
x.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec))
+ .map(x ->
x.getValueDescriptor().getOptions().getExtension(monitoringInfoSpec))
.toArray(size -> new MonitoringInfoSpec[size]);
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 0636167..aa74ff2 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.HashMap;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.sdk.metrics.MetricName;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
index 901ab25..bec18ae 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
@@ -28,7 +28,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
index 62fae47..f3455df 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.core.metrics;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
index ed64f76..1b337ee 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.core.metrics;
import java.util.HashMap;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
/**
* Provides convenient one line factories for unit tests that need to generate
test MonitoringInfos.
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
index 14fa703..18e7829 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
@@ -21,7 +21,7 @@ import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
index 3e2671d..ac42caf 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Test;
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
index 7cff0a0..bffcd8a 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.core.metrics;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.junit.Before;
import org.junit.Test;
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index 23d1f83..93f4c2b 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -23,12 +23,12 @@ import static
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAtt
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.DistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.ExtremaData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.Distribution;
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
index 6b9435e..ade8960 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.flink.metrics;
-import static org.apache.beam.model.fnexecution.v1.BeamFnApi.labelProps;
+import static org.apache.beam.model.pipeline.v1.MetricsApi.labelProps;
import static
org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN;
import static
org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX;
import static org.hamcrest.CoreMatchers.is;
@@ -30,13 +30,13 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.DoubleDistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import
org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo.MonitoringInfoLabels;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.DoubleDistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import
org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.DistributionCell;
import org.apache.beam.runners.core.metrics.DistributionData;
@@ -193,7 +193,7 @@ public class FlinkMetricContainerTest {
.setMetric(
Metric.newBuilder()
.setDistributionData(
- BeamFnApi.DistributionData.newBuilder()
+ MetricsApi.DistributionData.newBuilder()
.setIntDistributionData(
IntDistributionData.newBuilder()
.setSum(30)
@@ -209,7 +209,7 @@ public class FlinkMetricContainerTest {
.setMetric(
Metric.newBuilder()
.setDistributionData(
- BeamFnApi.DistributionData.newBuilder()
+ MetricsApi.DistributionData.newBuilder()
.setDoubleDistributionData(
DoubleDistributionData.newBuilder()
.setSum(30)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index bff2d21..1da0af8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -43,8 +43,8 @@ import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
index 8cc0ec9..8990e3c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow.worker.fn.control;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
index 45cd65b..2a765b1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
@@ -24,7 +24,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java
index 974fc96..024aa95 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.dataflow.worker.fn.control;
import com.google.api.services.dataflow.model.CounterUpdate;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
interface MonitoringInfoToCounterUpdateTransformer {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index ebaa5d8..6518555 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
@@ -44,6 +43,7 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateNamespaces;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
index 8d41a6d..7f31c48 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
@@ -78,7 +78,7 @@ public class TimerReceiver {
ProcessBundleDescriptors.ExecutableProcessBundleDescriptor
executableProcessBundleDescriptor =
stageBundleFactory.getProcessBundleDescriptor();
- BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ ProcessBundleDescriptor processBundleDescriptor =
executableProcessBundleDescriptor.getProcessBundleDescriptor();
// Create and cache lookups so that we don't have to dive into the
ProcessBundleDescriptor
@@ -203,7 +203,7 @@ public class TimerReceiver {
// Retrieves all window coders for all TimerSpecs.
private static Map<String, Coder<BoundedWindow>> createTimerWindowCodersMap(
- BeamFnApi.ProcessBundleDescriptor processBundleDescriptor,
+ ProcessBundleDescriptor processBundleDescriptor,
Map<String, ProcessBundleDescriptors.TimerSpec> timerIdToTimerSpecMap,
RunnerApi.Components components) {
Map<String, Coder<BoundedWindow>> timerWindowCodersMap = new HashMap<>();
@@ -242,7 +242,7 @@ public class TimerReceiver {
private static RunnerApi.Coder getTimerWindowingCoder(
RunnerApi.PTransform pTransform,
String timerId,
- BeamFnApi.ProcessBundleDescriptor processBundleDescriptor) {
+ ProcessBundleDescriptor processBundleDescriptor) {
String timerPCollectionId = pTransform.getInputsMap().get(timerId);
RunnerApi.PCollection timerPCollection =
processBundleDescriptor.getPcollectionsMap().get(timerPCollectionId);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
index b3c9353..cb65456 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
@@ -17,15 +17,16 @@
*/
package org.apache.beam.runners.dataflow.worker.fn.control;
+import static org.apache.beam.model.pipeline.v1.MetricsApi.monitoringInfoSpec;
+
import com.google.api.services.dataflow.model.CounterMetadata;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.Map;
import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs.Enum;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs.Enum;
import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin;
@@ -54,11 +55,7 @@ class UserMonitoringInfoToCounterUpdateTransformer
}
static final String BEAM_METRICS_USER_PREFIX =
- Enum.USER_COUNTER
- .getValueDescriptor()
- .getOptions()
- .getExtension(BeamFnApi.monitoringInfoSpec)
- .getUrn();
+
Enum.USER_COUNTER.getValueDescriptor().getOptions().getExtension(monitoringInfoSpec).getUrn();
private Optional<String> validate(MonitoringInfo monitoringInfo) {
Optional<String> validatorResult = specValidator.validate(monitoringInfo);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index f38d31f..02ac1ba 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -36,6 +36,10 @@ import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.TimerInternals;
@@ -92,11 +96,11 @@ public class BeamFnMapTaskExecutorTest {
private static final String FAKE_OUTPUT_NAME = "fake_output_name";
private static final String FAKE_OUTPUT_PCOLLECTION_ID =
"fake_pcollection_id";
- private static final BeamFnApi.Metrics.PTransform FAKE_ELEMENT_COUNT_METRICS
=
- BeamFnApi.Metrics.PTransform.newBuilder()
+ private static final Metrics.PTransform FAKE_ELEMENT_COUNT_METRICS =
+ Metrics.PTransform.newBuilder()
.setProcessedElements(
- BeamFnApi.Metrics.PTransform.ProcessedElements.newBuilder()
-
.setMeasured(BeamFnApi.Metrics.PTransform.Measured.getDefaultInstance()))
+ Metrics.PTransform.ProcessedElements.newBuilder()
+
.setMeasured(Metrics.PTransform.Measured.getDefaultInstance()))
.build();
private static final BeamFnApi.RegisterRequest REGISTER_REQUEST =
@@ -122,11 +126,8 @@ public class BeamFnMapTaskExecutorTest {
final CountDownLatch progressSentLatch = new CountDownLatch(1);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
- final BeamFnApi.Metrics.User.MetricName metricName =
- BeamFnApi.Metrics.User.MetricName.newBuilder()
- .setNamespace(namespace)
- .setName(name)
- .build();
+ final Metrics.User.MetricName metricName =
+
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
InstructionRequestHandler instructionRequestHandler =
new InstructionRequestHandler() {
@@ -150,17 +151,16 @@ public class BeamFnMapTaskExecutorTest {
.setProcessBundleProgress(
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
.setMetrics(
- BeamFnApi.Metrics.newBuilder()
+ Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID,
FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
-
BeamFnApi.Metrics.PTransform.newBuilder()
+ Metrics.PTransform.newBuilder()
.addUser(
-
BeamFnApi.Metrics.User.newBuilder()
+ Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
-
BeamFnApi.Metrics.User.CounterData
- .newBuilder()
+
Metrics.User.CounterData.newBuilder()
.setValue(counterValue)))
.build())))
.build());
@@ -222,11 +222,8 @@ public class BeamFnMapTaskExecutorTest {
final CountDownLatch progressSentTwiceLatch = new CountDownLatch(2);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
- final BeamFnApi.Metrics.User.MetricName metricName =
- BeamFnApi.Metrics.User.MetricName.newBuilder()
- .setNamespace(namespace)
- .setName(name)
- .build();
+ final Metrics.User.MetricName metricName =
+
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
InstructionRequestHandler instructionRequestHandler =
new InstructionRequestHandler() {
@@ -250,17 +247,16 @@ public class BeamFnMapTaskExecutorTest {
.setProcessBundleProgress(
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
.setMetrics(
- BeamFnApi.Metrics.newBuilder()
+ Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID,
FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
-
BeamFnApi.Metrics.PTransform.newBuilder()
+ Metrics.PTransform.newBuilder()
.addUser(
-
BeamFnApi.Metrics.User.newBuilder()
+ Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
-
BeamFnApi.Metrics.User.CounterData
- .newBuilder()
+
Metrics.User.CounterData.newBuilder()
.setValue(
progressSentTwiceLatch
.getCount()
@@ -328,11 +324,8 @@ public class BeamFnMapTaskExecutorTest {
final CountDownLatch progressSentLatch = new CountDownLatch(1);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
- final BeamFnApi.Metrics.User.MetricName metricName =
- BeamFnApi.Metrics.User.MetricName.newBuilder()
- .setNamespace(namespace)
- .setName(name)
- .build();
+ final Metrics.User.MetricName metricName =
+
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
InstructionRequestHandler instructionRequestHandler =
new InstructionRequestHandler() {
@@ -349,17 +342,16 @@ public class BeamFnMapTaskExecutorTest {
.setProcessBundle(
BeamFnApi.ProcessBundleResponse.newBuilder()
.setMetrics(
- BeamFnApi.Metrics.newBuilder()
+ Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID,
FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
-
BeamFnApi.Metrics.PTransform.newBuilder()
+ Metrics.PTransform.newBuilder()
.addUser(
-
BeamFnApi.Metrics.User.newBuilder()
+ Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
-
BeamFnApi.Metrics.User.CounterData
- .newBuilder()
+
Metrics.User.CounterData.newBuilder()
.setValue(finalCounterValue)))
.build())))
.build();
@@ -371,17 +363,16 @@ public class BeamFnMapTaskExecutorTest {
.setProcessBundleProgress(
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
.setMetrics(
- BeamFnApi.Metrics.newBuilder()
+ Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID,
FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
-
BeamFnApi.Metrics.PTransform.newBuilder()
+ Metrics.PTransform.newBuilder()
.addUser(
-
BeamFnApi.Metrics.User.newBuilder()
+ Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
-
BeamFnApi.Metrics.User.CounterData
- .newBuilder()
+
Metrics.User.CounterData.newBuilder()
.setValue(counterValue)))
.build())))
.build());
@@ -447,37 +438,33 @@ public class BeamFnMapTaskExecutorTest {
final CountDownLatch progressSentLatch = new CountDownLatch(1);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
- final BeamFnApi.Metrics.User.MetricName metricName =
- BeamFnApi.Metrics.User.MetricName.newBuilder()
- .setNamespace(namespace)
- .setName(name)
- .build();
+ final Metrics.User.MetricName metricName =
+
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
- final BeamFnApi.Metrics deprecatedMetrics =
- BeamFnApi.Metrics.newBuilder()
+ final Metrics deprecatedMetrics =
+ Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
- BeamFnApi.Metrics.PTransform.newBuilder()
+ Metrics.PTransform.newBuilder()
.addUser(
- BeamFnApi.Metrics.User.newBuilder()
+ Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
- BeamFnApi.Metrics.User.CounterData.newBuilder()
- .setValue(finalCounterValue)))
+
Metrics.User.CounterData.newBuilder().setValue(finalCounterValue)))
.build())
.build();
final int expectedCounterValue = 5;
- final BeamFnApi.MonitoringInfo expectedMonitoringInfo =
- BeamFnApi.MonitoringInfo.newBuilder()
+ final MonitoringInfo expectedMonitoringInfo =
+ MonitoringInfo.newBuilder()
.setUrn("beam:metric:user:ExpectedCounter")
.setType("beam:metrics:sum_int_64")
.putLabels("PTRANSFORM", "ExpectedPTransform")
.setMetric(
- BeamFnApi.Metric.newBuilder()
+ Metric.newBuilder()
.setCounterData(
- BeamFnApi.CounterData.newBuilder()
+ MetricsApi.CounterData.newBuilder()
.setInt64Value(expectedCounterValue)
.build())
.build())
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
index 44a526a..9d96ae2 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
@@ -25,7 +25,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
index fc74c0f..c221cee 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
@@ -27,7 +27,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
index 5e5b10c..a401c90 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
@@ -27,7 +27,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 30c5b90..e32cd79 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -43,10 +43,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.apache.beam.fn.harness.FnHarness;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index f54b8fd..038c45b 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -39,7 +39,6 @@ import
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
@@ -47,6 +46,7 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.Builder;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 3c6ea15..7bee753 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java
index 9c6a2f2..6b20fe5 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java
@@ -21,7 +21,7 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index c5c5097..d10e993 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -37,8 +37,8 @@ import java.util.ServiceLoader;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.runners.core.construction.PTransformTranslation;
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
index d88405a..5490495 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.withSettings;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
diff --git a/sdks/python/apache_beam/metrics/cells.py
b/sdks/python/apache_beam/metrics/cells.py
index 65ca185..6dbc1af 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -37,6 +37,7 @@ from apache_beam.metrics.metricbase import Counter
from apache_beam.metrics.metricbase import Distribution
from apache_beam.metrics.metricbase import Gauge
from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import metrics_pb2
__all__ = ['DistributionResult', 'GaugeResult']
@@ -171,8 +172,8 @@ class CounterCell(Counter, MetricCell):
# and Distributions. Since there is no CounterData class this method
# was added to CounterCell. Consider adding a CounterData class or
# removing the GaugeData and DistributionData classes.
- return beam_fn_api_pb2.Metric(
- counter_data=beam_fn_api_pb2.CounterData(
+ return metrics_pb2.Metric(
+ counter_data=metrics_pb2.CounterData(
int64_value=self.get_cumulative()
)
)
@@ -404,8 +405,8 @@ class GaugeData(object):
def to_runner_api_monitoring_info(self):
"""Returns a Metric with this value for use in a MonitoringInfo."""
- return beam_fn_api_pb2.Metric(
- counter_data=beam_fn_api_pb2.CounterData(
+ return metrics_pb2.Metric(
+ counter_data=metrics_pb2.CounterData(
int64_value=self.value
)
)
@@ -478,9 +479,9 @@ class DistributionData(object):
def to_runner_api_monitoring_info(self):
"""Returns a Metric with this value for use in a MonitoringInfo."""
- return beam_fn_api_pb2.Metric(
- distribution_data=beam_fn_api_pb2.DistributionData(
- int_distribution_data=beam_fn_api_pb2.IntDistributionData(
+ return metrics_pb2.Metric(
+ distribution_data=metrics_pb2.DistributionData(
+ int_distribution_data=metrics_pb2.IntDistributionData(
count=self.count, sum=self.sum, min=self.min, max=self.max)))
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py
b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 94106c6..2e9059f 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -31,9 +31,9 @@ from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.cells import GaugeData
from apache_beam.metrics.cells import GaugeResult
from apache_beam.portability import common_urns
-from apache_beam.portability.api.beam_fn_api_pb2 import CounterData
-from apache_beam.portability.api.beam_fn_api_pb2 import Metric
-from apache_beam.portability.api.beam_fn_api_pb2 import MonitoringInfo
+from apache_beam.portability.api.metrics_pb2 import CounterData
+from apache_beam.portability.api.metrics_pb2 import Metric
+from apache_beam.portability.api.metrics_pb2 import MonitoringInfo
ELEMENT_COUNT_URN = common_urns.monitoring_infos.ELEMENT_COUNT.urn
START_BUNDLE_MSECS_URN = common_urns.monitoring_infos.START_BUNDLE_MSECS.urn
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 1cee4a8..8c60b6b 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -591,6 +591,7 @@ class DebugOptions(PipelineOptions):
'before enabling any experiments.'))
def add_experiment(self, experiment):
+ # pylint: disable=access-member-before-definition
if self.experiments is None:
self.experiments = []
if experiment not in self.experiments:
diff --git a/sdks/python/apache_beam/portability/common_urns.py
b/sdks/python/apache_beam/portability/common_urns.py
index a000f81..edbb681 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -21,8 +21,8 @@ from __future__ import absolute_import
from builtins import object
-from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.portability.api import metrics_pb2
from apache_beam.portability.api import standard_window_fns_pb2
@@ -78,6 +78,6 @@ session_windows = PropertiesFromPayloadType(
standard_window_fns_pb2.SessionsPayload)
monitoring_infos = PropertiesFromEnumType(
- beam_fn_api_pb2.MonitoringInfoUrns.Enum)
+ metrics_pb2.MonitoringInfoUrns.Enum)
monitoring_info_types = PropertiesFromEnumType(
- beam_fn_api_pb2.MonitoringInfoTypeUrns.Enum)
+ metrics_pb2.MonitoringInfoTypeUrns.Enum)