[ 
https://issues.apache.org/jira/browse/BEAM-10848?focusedWorklogId=493744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-493744
 ]

ASF GitHub Bot logged work on BEAM-10848:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/20 00:07
            Start Date: 02/Oct/20 00:07
    Worklog Time Spent: 10m 
      Work Description: codecov[bot] edited a comment on pull request #12992:
URL: https://github.com/apache/beam/pull/12992#issuecomment-702403670


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=h1) Report
   > Merging 
[#12992](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=desc) into 
[master](https://codecov.io/gh/apache/beam/commit/e6b3cf1da5f17a3a15f2a48986a1b84266f3a64c?el=desc)
 will **increase** coverage by `0.02%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/beam/pull/12992/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12992      +/-   ##
   ==========================================
   + Coverage   82.52%   82.55%   +0.02%     
   ==========================================
     Files         453      453              
     Lines       54610    54701      +91     
   ==========================================
   + Hits        45067    45156      +89     
   - Misses       9543     9545       +2     
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=tree) | Coverage 
Δ | |
   |---|---|---|
   | 
[sdks/python/apache\_beam/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWV0cmljcy9jZWxscy5weQ==)
 | `82.05% <100.00%> (ø)` | |
   | 
[sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=)
 | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5)
 | `91.66% <0.00%> (-0.76%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5)
 | `96.75% <0.00%> (-0.45%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/transforms/util.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy91dGlsLnB5)
 | `95.44% <0.00%> (-0.19%)` | :arrow_down: |
   | 
[sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=)
 | `96.82% <0.00%> (+0.52%)` | :arrow_up: |
   | 
[conftest.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-Y29uZnRlc3QucHk=)
 | `85.71% <0.00%> (+2.38%)` | :arrow_up: |
   | 
[sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12992/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=)
 | `95.23% <0.00%> (+11.90%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=footer). Last 
update 
[e6b3cf1...5991994](https://codecov.io/gh/apache/beam/pull/12992?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 493744)
    Time Spent: 50m  (was: 40m)

> Gauge metrics error when setting timers
> ---------------------------------------
>
>                 Key: BEAM-10848
>                 URL: https://issues.apache.org/jira/browse/BEAM-10848
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>            Reporter: Maximilian Michels
>            Priority: P2
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Gauges are affected by setting timers leading to {{None}} values:
> {noformat}
> ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. 
> Original traceback is
> Traceback (most recent call last):
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 253, in _execute
>     response = task()
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 516, in process_bundle
>     monitoring_infos = bundle_processor.monitoring_infos()
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1107, in monitoring_infos
>     op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
>   File "apache_beam/runners/worker/operations.py", line 340, in 
> apache_beam.runners.worker.operations.Operation.monitoring_infos
>   File "apache_beam/runners/worker/operations.py", line 347, in 
> apache_beam.runners.worker.operations.Operation.monitoring_infos
>   File "apache_beam/runners/worker/operations.py", line 386, in 
> apache_beam.runners.worker.operations.Operation.user_monitoring_infos
>   File "apache_beam/metrics/execution.py", line 261, in 
> apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
>   File "apache_beam/metrics/cells.py", line 222, in 
> apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py",
>  line 222, in int64_user_gauge
>     payload = _encode_gauge(coder, timestamp, value)
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py",
>  line 397, in _encode_gauge
>     coder.get_impl().encode_to_stream(value, stream, True)
>   File "apache_beam/coders/coder_impl.py", line 690, in 
> apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 692, in 
> apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
> TypeError: an integer is required
> {noformat}
> The transform has the following structure and errors when the lines following 
> {{TODO}} have been uncommented:
> {code:python}
> class StatefulOperation(beam.DoFn):
>   def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
>     self.state_size_per_key_bytes = state_size_per_key_bytes
>     self.str_coder = StrUtf8Coder().get_impl()
>     self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
>     self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
>     self.use_processing_timer = use_processing_timer
>   state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
>   state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', 
> combine_fn=sum)
>   state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', 
> combine_fn=sum)
>   event_timer_spec = userstate.TimerSpec('event_timer', 
> beam.TimeDomain.WATERMARK)
>   processing_timer_spec = userstate.TimerSpec('proc_timer', 
> beam.TimeDomain.REAL_TIME)
>   def process(self,
>               element,
>               timestamp=beam.DoFn.TimestampParam,
>               state=beam.DoFn.StateParam(state_spec),
>               state_num_bytes=beam.DoFn.StateParam(state_spec2),
>               state_num_entries=beam.DoFn.StateParam(state_spec3),
>               event_timer=beam.DoFn.TimerParam(event_timer_spec),
>               processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
>     # Append stringified element to state until the threshold has been reached
>     # The cleanup timer will then clean up and the process will repeat.
>     if state_num_bytes.read() <= self.state_size_per_key_bytes:
>       state_element = str(element)
>       state.add(state_element)
>       bytes_added = len(self.str_coder.encode_nested(state_element))
>       state_num_bytes.add(bytes_added)
>       state_num_entries.add(1)
>       timer = processing_timer if self.use_processing_timer else event_timer
>       # Set a timer which will clear the state if it grows too large
>       timer.set(timestamp.micros // 1000000 + 5)
>     # Metrics
>     # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
>     #self.bytes_gauge.set(state_num_bytes.read())
>     #self.elements_gauge.set(state_num_entries.read())
>     yield element
>   @userstate.on_timer(event_timer_spec)
>   def on_event_timer(self,
>                      key=beam.DoFn.KeyParam,
>                      state=beam.DoFn.StateParam(state_spec),
>                      state_num_bytes=beam.DoFn.StateParam(state_spec2),
>                      state_num_entries=beam.DoFn.StateParam(state_spec3)):
>     return self.timer_callback(state, state_num_bytes, state_num_entries)
>   @userstate.on_timer(processing_timer_spec)
>   def on_processing_timer(self,
>                           state=beam.DoFn.StateParam(state_spec),
>                           state_num_bytes=beam.DoFn.StateParam(state_spec2),
>                           
> state_num_entries=beam.DoFn.StateParam(state_spec3)):
>     return self.timer_callback(state, state_num_bytes, state_num_entries)
>   def timer_callback(self, state, state_num_bytes, state_num_entries):
>     count = 0
>     for _ in state.read():
>       count += 1
>     state_count = state_num_entries.read()
>     if count != state_count:
>       raise Exception("Actual number of entries (%s) did not match expected 
> (%s)" % (count, state_count))
>     # Reset state bags
>     state.clear()
>     state_num_bytes.clear()
>     state_num_entries.clear()
>     # Reset metrics
>     # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
>     #self.bytes_gauge.set(0)
>     #self.elements_gauge.set(0)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to