[ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378428&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378428
 ]

ASF GitHub Bot logged work on BEAM-9132:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jan/20 19:37
            Start Date: 28/Jan/20 19:37
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372014284
 
 

 ##########
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
 ##########
 @@ -360,6 +360,19 @@ public void closesEnvironmentOnCleanup() throws Exception 
{
     verify(remoteEnvironment).close();
   }
 
+  @Test
+  public void closesEnvironmentOnCleanupWithPendingRefs() throws Exception {
+    try (DefaultJobBundleFactory bundleFactory =
+        createDefaultJobBundleFactory(envFactoryProviderMap)) {
+      DefaultJobBundleFactory.SimpleStageBundleFactory stageBundleFactory =
+          (DefaultJobBundleFactory.SimpleStageBundleFactory)
+              bundleFactory.forStage(getExecutableStage(environment));
+      // The client is still being used, e.g. when the pipeline fails and is 
shut down
+      stageBundleFactory.currentClient.wrappedClient.ref();
 
 Review comment:
   I'm assuming here we have a `ref()` due to the client being used. If we had 
no ref, we would be idling. The error message we were seeing is only possible 
if we are currently processing a bundle.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 378428)
    Time Spent: 2h 10m  (was: 2h)

> State request handler is removed prematurely when closing ActiveBundle
> ----------------------------------------------------------------------
>
>                 Key: BEAM-9132
>                 URL: https://issues.apache.org/jira/browse/BEAM-9132
>             Project: Beam
>          Issue Type: Bug
>          Components: java-fn-execution
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
>     recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
>     items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
>     for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
>     self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
>     self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
>     self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
>     continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
>     raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to