[ https://issues.apache.org/jira/browse/BEAM-10848?focusedWorklogId=493744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-493744 ]
ASF GitHub Bot logged work on BEAM-10848: ----------------------------------------- Author: ASF GitHub Bot Created on: 02/Oct/20 00:07 Start Date: 02/Oct/20 00:07 Worklog Time Spent: 10m Work Description: codecov[bot] edited a comment on pull request #12992: URL: https://github.com/apache/beam/pull/12992#issuecomment-702403670 # [Codecov](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=h1) Report > Merging [#12992](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/e6b3cf1da5f17a3a15f2a48986a1b84266f3a64c?el=desc) will **increase** coverage by `0.02%`. > The diff coverage is `100.00%`. [](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master #12992 +/- ## ========================================== + Coverage 82.52% 82.55% +0.02% ========================================== Files 453 453 Lines 54610 54701 +91 ========================================== + Hits 45067 45156 +89 - Misses 9543 9545 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=tree) | Coverage Δ | | |---|---|---| | [sdks/python/apache\_beam/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWV0cmljcy9jZWxscy5weQ==) | `82.05% <100.00%> (ø)` | | | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: | | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `91.66% <0.00%> (-0.76%)` | :arrow_down: | | [sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5) | `96.75% <0.00%> (-0.45%)` | :arrow_down: | | [sdks/python/apache\_beam/transforms/util.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy91dGlsLnB5) | `95.44% <0.00%> (-0.19%)` | :arrow_down: | | [sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=) | `96.82% <0.00%> (+0.52%)` | :arrow_up: | | [conftest.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-Y29uZnRlc3QucHk=) | `85.71% <0.00%> (+2.38%)` | :arrow_up: | | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.23% <0.00%> (+11.90%)` | :arrow_up: | ------ [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=footer). Last update [e6b3cf1...5991994](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 493744) Time Spent: 50m (was: 40m) > Gauge metrics error when setting timers > --------------------------------------- > > Key: BEAM-10848 > URL: https://issues.apache.org/jira/browse/BEAM-10848 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness > Reporter: Maximilian Michels > Priority: P2 > Time Spent: 50m > Remaining Estimate: 0h > > Gauges are affected by setting timers leading to {{None}} values: > {noformat} > ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. > Original traceback is > Traceback (most recent call last): > File > "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 253, in _execute > response = task() > File > "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 310, in <lambda> > lambda: self.create_worker().do_instruction(request), request) > File > "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 480, in do_instruction > getattr(request, request_type), request.instruction_id) > File > "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 516, in process_bundle > monitoring_infos = bundle_processor.monitoring_infos() > File > "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 1107, in monitoring_infos > op.monitoring_infos(transform_id, dict(tag_to_pcollection_id))) > File "apache_beam/runners/worker/operations.py", line 340, in > apache_beam.runners.worker.operations.Operation.monitoring_infos > File "apache_beam/runners/worker/operations.py", line 347, in > apache_beam.runners.worker.operations.Operation.monitoring_infos > File "apache_beam/runners/worker/operations.py", line 386, in > apache_beam.runners.worker.operations.Operation.user_monitoring_infos > File "apache_beam/metrics/execution.py", line 261, in > apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos > File "apache_beam/metrics/cells.py", line 222, in > apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info > File > "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", > line 222, in int64_user_gauge > payload = _encode_gauge(coder, timestamp, value) > File > "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", > line 397, in _encode_gauge > coder.get_impl().encode_to_stream(value, stream, True) > File "apache_beam/coders/coder_impl.py", line 690, in > apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream > File "apache_beam/coders/coder_impl.py", line 692, in > apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream > TypeError: an integer is required > {noformat} > The transform has the following structure and errors when the lines following > {{TODO}} have been uncommented: > {code:python} > class StatefulOperation(beam.DoFn): > def __init__(self, state_size_per_key_bytes, use_processing_timer=False): > self.state_size_per_key_bytes = state_size_per_key_bytes > self.str_coder = StrUtf8Coder().get_impl() > self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes') > self.elements_gauge = Metrics.gauge('synthetic', 'state_elements') > self.use_processing_timer = use_processing_timer > state_spec = userstate.BagStateSpec('state', StrUtf8Coder()) > state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', > combine_fn=sum) > state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', > combine_fn=sum) > event_timer_spec = userstate.TimerSpec('event_timer', > beam.TimeDomain.WATERMARK) > processing_timer_spec = userstate.TimerSpec('proc_timer', > beam.TimeDomain.REAL_TIME) > def process(self, > element, > timestamp=beam.DoFn.TimestampParam, > state=beam.DoFn.StateParam(state_spec), > state_num_bytes=beam.DoFn.StateParam(state_spec2), > state_num_entries=beam.DoFn.StateParam(state_spec3), > event_timer=beam.DoFn.TimerParam(event_timer_spec), > processing_timer=beam.DoFn.TimerParam(processing_timer_spec)): > # Append stringified element to state until the threshold has been reached > # The cleanup timer will then clean up and the process will repeat. > if state_num_bytes.read() <= self.state_size_per_key_bytes: > state_element = str(element) > state.add(state_element) > bytes_added = len(self.str_coder.encode_nested(state_element)) > state_num_bytes.add(bytes_added) > state_num_entries.add(1) > timer = processing_timer if self.use_processing_timer else event_timer > # Set a timer which will clear the state if it grows too large > timer.set(timestamp.micros // 1000000 + 5) > # Metrics > # TODO Unfortunately buggy with timers, needs to be fixed in Beam: > #self.bytes_gauge.set(state_num_bytes.read()) > #self.elements_gauge.set(state_num_entries.read()) > yield element > @userstate.on_timer(event_timer_spec) > def on_event_timer(self, > key=beam.DoFn.KeyParam, > state=beam.DoFn.StateParam(state_spec), > state_num_bytes=beam.DoFn.StateParam(state_spec2), > state_num_entries=beam.DoFn.StateParam(state_spec3)): > return self.timer_callback(state, state_num_bytes, state_num_entries) > @userstate.on_timer(processing_timer_spec) > def on_processing_timer(self, > state=beam.DoFn.StateParam(state_spec), > state_num_bytes=beam.DoFn.StateParam(state_spec2), > > state_num_entries=beam.DoFn.StateParam(state_spec3)): > return self.timer_callback(state, state_num_bytes, state_num_entries) > def timer_callback(self, state, state_num_bytes, state_num_entries): > count = 0 > for _ in state.read(): > count += 1 > state_count = state_num_entries.read() > if count != state_count: > raise Exception("Actual number of entries (%s) did not match expected > (%s)" % (count, state_count)) > # Reset state bags > state.clear() > state_num_bytes.clear() > state_num_entries.clear() > # Reset metrics > # TODO Unfortunately buggy with timers, needs to be fixed in Beam: > #self.bytes_gauge.set(0) > #self.elements_gauge.set(0) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)