[ 
https://issues.apache.org/jira/browse/BEAM-9801?focusedWorklogId=429749&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-429749
 ]

ASF GitHub Bot logged work on BEAM-9801:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/20 01:24
            Start Date: 02/May/20 01:24
    Worklog Time Spent: 10m 
      Work Description: ibzib commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418813369



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##########
@@ -247,25 +247,27 @@ public void reduce(Iterable<WindowedValue<InputT>> 
iterable, Collector<RawUnionV
     
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which 
may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, 
progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", 
transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {

Review comment:
       Made #11595 to update Spark.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 429749)
    Time Spent: 9.5h  (was: 9h 20m)

> Setting a timer from a timer callback fails
> -------------------------------------------
>
>                 Key: BEAM-9801
>                 URL: https://issues.apache.org/jira/browse/BEAM-9801
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>            Reporter: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.21.0
>
>          Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> Hi,
> I'm trying to set a timer from a timer callback in the Python SDK:
> {code:Python}
> class GenerateLoad(beam.DoFn):
>   timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
>   def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
>     self.key = element[0]
>     timer.set(0)
>   @userstate.on_timer(timer_spec)
>   def process_timer(self, timer=beam.DoFn.TimerParam(timer_spec)):
>     timer.set(0)
> {code}
> This yields the following Python stack trace:
> {noformat}
> INFO:apache_beam.utils.subprocess_server:Caused by: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 4: Traceback (most recent call last):
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
> INFO:apache_beam.utils.subprocess_server: response = task()
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
> INFO:apache_beam.utils.subprocess_server: lambda: 
> self.create_worker().do_instruction(request), request)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
> INFO:apache_beam.utils.subprocess_server: getattr(request, request_type), 
> request.instruction_id)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
> INFO:apache_beam.utils.subprocess_server: 
> bundle_processor.process_bundle(instruction_id))
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/worker/bundle_processor.py", line 910, in process_bundle
> INFO:apache_beam.utils.subprocess_server: element.timer_family_id, timer_data)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/worker/operations.py", line 688, in process_timer
> INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/common.py", line 990, in process_user_timer
> INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/common.py", line 1043, in _reraise_augmented
> INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/common.py", line 988, in process_user_timer
> INFO:apache_beam.utils.subprocess_server: 
> self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/common.py", line 517, in invoke_user_timer
> INFO:apache_beam.utils.subprocess_server: self.user_state_context, key, 
> window, timestamp))
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/common.py", line 1093, in process_outputs
> INFO:apache_beam.utils.subprocess_server: for result in results:
> INFO:apache_beam.utils.subprocess_server: File 
> "/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py",
>  line 185, in process_timer
> INFO:apache_beam.utils.subprocess_server: timer.set(0)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/runners/worker/bundle_processor.py", line 589, in set
> INFO:apache_beam.utils.subprocess_server: 
> self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/coders/coder_impl.py", line 651, in encode_to_stream
> INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, True)
> INFO:apache_beam.utils.subprocess_server: File 
> "apache_beam/coders/coder_impl.py", line 608, in encode_to_stream
> INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000
> INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType' object 
> has no attribute 'micros' [while running 'GenerateLoad']
> {noformat}
> Looking at the code base, I'm not sure we have tests for timer output 
> timestamps. Am I missing something?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to