damccorm commented on code in PR #37639:
URL: https://github.com/apache/beam/pull/37639#discussion_r2983600811
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -1248,84 +1226,56 @@ def
_verify_interpreter_version_is_supported(pipeline_options):
# is used.
structured_counter_translations = {
cy_combiners.CountCombineFn: (
- dataflow.CounterMetadata.KindValueValuesEnum.SUM,
- MetricUpdateTranslators.translate_scalar_counter_int),
+ 'Sum', MetricUpdateTranslators.translate_scalar_counter_int),
Review Comment:
Should we define our own enum for these types
(`Sum/Min/Distribution/etc...`) to avoid hardcoded strings?
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -1130,38 +1114,32 @@ class
DataflowJobAlreadyExistsError(retry.PermanentException):
pass
-def to_split_int(n):
- res = dataflow.SplitInt64()
- res.lowBits = n & 0xffffffff
- res.highBits = n >> 32
- return res
-
-
# TODO: Used in legacy batch worker. Move under MetricUpdateTranslators
# after Runner V2 transition.
-def translate_distribution(distribution_update, metric_update_proto):
+def translate_distribution(
+ distribution_update, metric_update_proto: dataflow.MetricUpdate):
"""Translate metrics DistributionUpdate to dataflow distribution update.
Args:
distribution_update: Instance of DistributionData,
DistributionInt64Accumulator or DataflowDistributionCounter.
metric_update_proto: Used for report metrics.
"""
- dist_update_proto = dataflow.DistributionUpdate()
- dist_update_proto.min = to_split_int(distribution_update.min)
- dist_update_proto.max = to_split_int(distribution_update.max)
- dist_update_proto.count = to_split_int(distribution_update.count)
- dist_update_proto.sum = to_split_int(distribution_update.sum)
+ dist_update_proto = Struct()
+ dist_update_proto.update({"min": distribution_update.min})
+ dist_update_proto.update({"max": distribution_update.max})
+ dist_update_proto.update({"count": distribution_update.count})
+ dist_update_proto.update({"sum": distribution_update.sum})
# DataflowDistributionCounter needs to translate histogram
- if isinstance(distribution_update, DataflowDistributionCounter):
- dist_update_proto.histogram = dataflow.Histogram()
- distribution_update.translate_to_histogram(dist_update_proto.histogram)
+ # if isinstance(distribution_update, DataflowDistributionCounter):
+ # dist_update_proto.histogram = dataflow.Histogram()
+ # distribution_update.translate_to_histogram(dist_update_proto.histogram)
Review Comment:
I'm a bit confused by this chunk and had 2 questions:
1) Why don't we need the `to_split_int` call anymore?
2) Why is this piece now commented out?
##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -753,16 +759,14 @@ def api_jobstate_to_pipeline_state(api_jobstate):
values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
values_enum.JOB_STATE_RESOURCE_CLEANING_UP: PipelineState.
RESOURCE_CLEANING_UP,
- values_enum.JOB_STATE_PAUSING: PipelineState.PAUSING,
- values_enum.JOB_STATE_PAUSED: PipelineState.PAUSED,
Review Comment:
Did you mean to remove these?
##########
sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py:
##########
@@ -135,24 +134,22 @@ def _get_metric_key(self, metric):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
- step = _get_match(
- metric.name.context.additionalProperties,
- lambda x: x.key == STEP_LABEL).value
+ step = metric.name.context['step']
step = self._translate_step_name(step)
except ValueError:
pass
namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
try:
- namespace = _get_match(
- metric.name.context.additionalProperties,
- lambda x: x.key == 'namespace').value
+ carried_namespace = metric.name.context['namespace']
Review Comment:
Does this still need to be in a try except? If yes, can we just check for
existence of the property first instead?
##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -149,29 +155,28 @@ def rank_error(msg):
while True:
response = runner.dataflow_client.get_job(job_id)
# If get() is called very soon after Create() the response may not
contain
- # an initialized 'currentState' field.
- if response.currentState is not None:
- if response.currentState != last_job_state:
+ # an initialized 'current_state' field.
+ if response.current_state is not None:
+ current_state = response.current_state.name
+ if current_state != last_job_state:
if state_update_callback:
- state_update_callback(response.currentState)
- _LOGGER.info('Job %s is in state %s', job_id, response.currentState)
- last_job_state = response.currentState
- if str(response.currentState) not in ('JOB_STATE_RUNNING',
- 'JOB_STATE_PAUSED',
- 'JOB_STATE_PAUSING'):
+ state_update_callback(current_state)
+ _LOGGER.info('Job %s is in state %s', job_id, current_state)
+ last_job_state = current_state
+ if str(current_state) not in ('JOB_STATE_RUNNING'):
Review Comment:
Same here - I think we need to keep the paused/pausing logic (this is being
actively built out)
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py:
##########
@@ -469,101 +468,88 @@ def test_default_job_name(self):
regexp = 'beamapp-.*-[0-9]{10}-[0-9]{6}-[a-z0-9]{8}$'
self.assertRegex(job_name, regexp)
- def test_split_int(self):
- number = 12345
- split_number = apiclient.to_split_int(number)
- self.assertEqual((split_number.lowBits, split_number.highBits), (number,
0))
- shift_number = number << 32
- split_number = apiclient.to_split_int(shift_number)
- self.assertEqual((split_number.lowBits, split_number.highBits), (0,
number))
-
def test_translate_distribution_using_accumulator(self):
- metric_update = dataflow.CounterUpdate()
+ metric_update = dataflow.MetricUpdate()
accumulator = mock.Mock()
accumulator.min = 1
accumulator.max = 15
accumulator.sum = 16
accumulator.count = 2
apiclient.translate_distribution(accumulator, metric_update)
- self.assertEqual(metric_update.distribution.min.lowBits, accumulator.min)
- self.assertEqual(metric_update.distribution.max.lowBits, accumulator.max)
- self.assertEqual(metric_update.distribution.sum.lowBits, accumulator.sum)
- self.assertEqual(
- metric_update.distribution.count.lowBits, accumulator.count)
+ self.assertEqual(metric_update.distribution['min'], accumulator.min)
+ self.assertEqual(metric_update.distribution['max'], accumulator.max)
+ self.assertEqual(metric_update.distribution['sum'], accumulator.sum)
+ self.assertEqual(metric_update.distribution['count'], accumulator.count)
def test_translate_distribution_using_distribution_data(self):
- metric_update = dataflow.CounterUpdate()
+ metric_update = dataflow.MetricUpdate()
distribution_update = DistributionData(16, 2, 1, 15)
apiclient.translate_distribution(distribution_update, metric_update)
+ self.assertEqual(metric_update.distribution['min'],
distribution_update.min)
+ self.assertEqual(metric_update.distribution['max'],
distribution_update.max)
+ self.assertEqual(metric_update.distribution['sum'],
distribution_update.sum)
self.assertEqual(
- metric_update.distribution.min.lowBits, distribution_update.min)
- self.assertEqual(
- metric_update.distribution.max.lowBits, distribution_update.max)
- self.assertEqual(
- metric_update.distribution.sum.lowBits, distribution_update.sum)
- self.assertEqual(
- metric_update.distribution.count.lowBits, distribution_update.count)
-
- def test_translate_distribution_using_dataflow_distribution_counter(self):
- counter_update = DataflowDistributionCounter()
- counter_update.add_input(1)
- counter_update.add_input(3)
- metric_proto = dataflow.CounterUpdate()
- apiclient.translate_distribution(counter_update, metric_proto)
- histogram = mock.Mock(firstBucketOffset=None, bucketCounts=None)
- counter_update.translate_to_histogram(histogram)
- self.assertEqual(metric_proto.distribution.min.lowBits, counter_update.min)
- self.assertEqual(metric_proto.distribution.max.lowBits, counter_update.max)
- self.assertEqual(metric_proto.distribution.sum.lowBits, counter_update.sum)
- self.assertEqual(
- metric_proto.distribution.count.lowBits, counter_update.count)
- self.assertEqual(
- metric_proto.distribution.histogram.bucketCounts,
- histogram.bucketCounts)
- self.assertEqual(
- metric_proto.distribution.histogram.firstBucketOffset,
- histogram.firstBucketOffset)
+ metric_update.distribution['count'], distribution_update.count)
+
+
+# def test_translate_distribution_using_dataflow_distribution_counter(self):
Review Comment:
Related to above, is there a reason to keep this commented out?
##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py:
##########
@@ -195,22 +197,6 @@ def get_job_side_effect(*args, **kwargs):
result = duration_timedout_result.wait_until_finish(5000)
self.assertEqual(result, PipelineState.RUNNING)
- with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20,
20])):
- duration_timedout_runner = MockDataflowRunner(
- [values_enum.JOB_STATE_PAUSING])
- duration_timedout_result = DataflowPipelineResult(
- duration_timedout_runner.job, duration_timedout_runner, options)
- result = duration_timedout_result.wait_until_finish(5000)
- self.assertEqual(result, PipelineState.PAUSING)
-
- with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20,
20])):
- duration_timedout_runner = MockDataflowRunner(
- [values_enum.JOB_STATE_PAUSED])
- duration_timedout_result = DataflowPipelineResult(
- duration_timedout_runner.job, duration_timedout_runner, options)
- result = duration_timedout_result.wait_until_finish(5000)
- self.assertEqual(result, PipelineState.PAUSED)
Review Comment:
Same as above, I think we want paused/pausing tests (here and below)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]