[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380088&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380088 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 31/Jan/20 16:55 Start Date: 31/Jan/20 16:55 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r37358 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -342,22 +343,14 @@ public RemoteBundle getBundle( // TODO: Consider having BundleProcessor#newBundle take in an OutputReceiverFactory rather // than constructing the receiver map here. Every bundle factory will need this. - if (environmentExpirationMillis == 0 && !loadBalanceBundles) { -return currentClient.processor.newBundle( -getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory) -.build(), -stateRequestHandler, -progressHandler); - } Review comment: During the initial review of the bundle load balancing the idea came up to have two implementations, one with refing, one without. I think that would be the best improvement moving forward. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380088) Time Spent: 3h 20m (was: 3h 10m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380084&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380084 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 31/Jan/20 16:53 Start Date: 31/Jan/20 16:53 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r373580208 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -342,22 +343,14 @@ public RemoteBundle getBundle( // TODO: Consider having BundleProcessor#newBundle take in an OutputReceiverFactory rather // than constructing the receiver map here. Every bundle factory will need this. - if (environmentExpirationMillis == 0 && !loadBalanceBundles) { -return currentClient.processor.newBundle( -getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory) -.build(), -stateRequestHandler, -progressHandler); - } Review comment: I suppose it was working correctly due to the initial ref() when creating the environment. Still, it doesn't hurt to remove this bit because it just adds a specialized execution path to the generalized version. The ref business is already complex enough. If you think differently feel free to add it back, but IMHO this is easier to understand. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380084) Time Spent: 3h 10m (was: 3h) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h 10m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380073&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380073 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 31/Jan/20 16:19 Start Date: 31/Jan/20 16:19 Worklog Time Spent: 10m Work Description: tweise commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r373563654 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -342,22 +343,14 @@ public RemoteBundle getBundle( // TODO: Consider having BundleProcessor#newBundle take in an OutputReceiverFactory rather // than constructing the receiver map here. Every bundle factory will need this. - if (environmentExpirationMillis == 0 && !loadBalanceBundles) { -return currentClient.processor.newBundle( -getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory) -.build(), -stateRequestHandler, -progressHandler); - } Review comment: This is the original code path that was used when no reference counting is required. What was the problem with it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380073) Time Spent: 3h (was: 2h 50m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380003&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380003 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 31/Jan/20 13:50 Start Date: 31/Jan/20 13:50 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380003) Time Spent: 2h 50m (was: 2h 40m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=379936&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379936 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 31/Jan/20 12:22 Start Date: 31/Jan/20 12:22 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r373452096 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -342,22 +343,14 @@ public RemoteBundle getBundle( // TODO: Consider having BundleProcessor#newBundle take in an OutputReceiverFactory rather // than constructing the receiver map here. Every bundle factory will need this. - if (environmentExpirationMillis == 0 && !loadBalanceBundles) { -return currentClient.processor.newBundle( -getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory) -.build(), -stateRequestHandler, -progressHandler); - } Review comment: I discovered another issue in this code path. `ref()` is not called on the client in this case. Also, `unref()` will never be called because the code below does not insert the call to the bundle close. `close()` is only ever called on `unref()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379936) Time Spent: 2h 40m (was: 2.5h) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378991&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378991 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 29/Jan/20 19:52 Start Date: 29/Jan/20 19:52 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r372585551 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -406,11 +407,14 @@ public void split(double fractionOfRemainder) { @Override public void close() throws Exception { - bundle.close(); - currentClient.wrappedClient.unref(); - if (loadBalanceBundles) { -availableCaches.offer(currentCache); -availableCachesSemaphore.release(); + try { +bundle.close(); + } finally { +currentClient.wrappedClient.unref(); +if (loadBalanceBundles) { + availableCaches.offer(currentCache); + availableCachesSemaphore.release(); +} Review comment: Looks like this was responsible for the cleanup failing. bundle.close() may throw leaving the environment still referenced. My tests do not yield any more errors like in the description. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378991) Time Spent: 2.5h (was: 2h 20m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378950&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378950 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 29/Jan/20 17:59 Start Date: 29/Jan/20 17:59 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r372540351 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -255,6 +255,14 @@ public void close() throws Exception { // Clear the cache. This closes all active environments. // note this may cause open calls to be cancelled by the peer for (LoadingCache environmentCache : environmentCaches) { + for (WrappedSdkHarnessClient client : environmentCache.asMap().values()) { +try { + client.close(); Review comment: I've removed this in favor of using `unref()`. I think I found the case where `unref()` would not be called: https://github.com/apache/beam/pull/10694/files#diff-e80c769f0011537cc2b60d3e7898cf5aR413 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378950) Time Spent: 2h 20m (was: 2h 10m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378428&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378428 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 28/Jan/20 19:37 Start Date: 28/Jan/20 19:37 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r372014284 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java ## @@ -360,6 +360,19 @@ public void closesEnvironmentOnCleanup() throws Exception { verify(remoteEnvironment).close(); } + @Test + public void closesEnvironmentOnCleanupWithPendingRefs() throws Exception { +try (DefaultJobBundleFactory bundleFactory = +createDefaultJobBundleFactory(envFactoryProviderMap)) { + DefaultJobBundleFactory.SimpleStageBundleFactory stageBundleFactory = + (DefaultJobBundleFactory.SimpleStageBundleFactory) + bundleFactory.forStage(getExecutableStage(environment)); + // The client is still being used, e.g. when the pipeline fails and is shut down + stageBundleFactory.currentClient.wrappedClient.ref(); Review comment: I'm assuming here we have a `ref()` due to the client being used. If we had no ref, we would be idling. The error message we were seeing is only possible if we are currently processing a bundle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378428) Time Spent: 2h 10m (was: 2h) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378417&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378417 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 28/Jan/20 19:18 Start Date: 28/Jan/20 19:18 Worklog Time Spent: 10m Work Description: tweise commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r372004299 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -255,6 +255,14 @@ public void close() throws Exception { // Clear the cache. This closes all active environments. // note this may cause open calls to be cancelled by the peer for (LoadingCache environmentCache : environmentCaches) { + for (WrappedSdkHarnessClient client : environmentCache.asMap().values()) { +try { + client.close(); Review comment: Worth mentioning that this is added to close the environments irrespective of open bundles, since this will occur only during shutdown? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378417) Time Spent: 2h (was: 1h 50m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378412 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 28/Jan/20 19:13 Start Date: 28/Jan/20 19:13 Worklog Time Spent: 10m Work Description: tweise commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r372001730 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java ## @@ -360,6 +360,19 @@ public void closesEnvironmentOnCleanup() throws Exception { verify(remoteEnvironment).close(); } + @Test + public void closesEnvironmentOnCleanupWithPendingRefs() throws Exception { +try (DefaultJobBundleFactory bundleFactory = +createDefaultJobBundleFactory(envFactoryProviderMap)) { + DefaultJobBundleFactory.SimpleStageBundleFactory stageBundleFactory = + (DefaultJobBundleFactory.SimpleStageBundleFactory) + bundleFactory.forStage(getExecutableStage(environment)); + // The client is still being used, e.g. when the pipeline fails and is shut down + stageBundleFactory.currentClient.wrappedClient.ref(); Review comment: What would cause this extra ref in the actual operator lifecycle? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378412) Time Spent: 1h 50m (was: 1h 40m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378411&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378411 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 28/Jan/20 19:12 Start Date: 28/Jan/20 19:12 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r372001232 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -464,6 +474,8 @@ ServerInfo getServerInfo() { } public void close() { Review comment: It's now also called from here: https://github.com/apache/beam/pull/10694/files#diff-e80c769f0011537cc2b60d3e7898cf5aR260 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378411) Time Spent: 1h 40m (was: 1.5h) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378410&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378410 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 28/Jan/20 19:10 Start Date: 28/Jan/20 19:10 Worklog Time Spent: 10m Work Description: tweise commented on pull request #10694: [BEAM-9132] Avoid logging misleading error messages during pipeline failure URL: https://github.com/apache/beam/pull/10694#discussion_r372000633 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java ## @@ -464,6 +474,8 @@ ServerInfo getServerInfo() { } public void close() { Review comment: Isn't close only called from unref? If so, how does this change the behavior? (Possibly some more explanation needs to be added.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378410) Time Spent: 1.5h (was: 1h 20m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378292&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378292 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 28/Jan/20 15:19 Start Date: 28/Jan/20 15:19 Worklog Time Spent: 10m Work Description: mxm commented on issue #10694: [BEAM-9132] Use a unique bundle id across all SDK workers bound to the same environment URL: https://github.com/apache/beam/pull/10694#issuecomment-579296135 This does not fix the issue because the id generation scheme is effectively the same. I'm still seeing the same errors, but I have a new trace. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378292) Time Spent: 1h 20m (was: 1h 10m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377753&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377753 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 27/Jan/20 17:31 Start Date: 27/Jan/20 17:31 Worklog Time Spent: 10m Work Description: mxm commented on issue #10694: [BEAM-9132] Use a unique bundle id across all SDK workers bound to the same environment URL: https://github.com/apache/beam/pull/10694#issuecomment-578861479 This is a follow-up to #10611 to fix the root cause of the state handler de-registration. I appreciate if you could have a look @lukecwik. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377753) Time Spent: 1h 10m (was: 1h) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377737 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 27/Jan/20 17:01 Start Date: 27/Jan/20 17:01 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10694: [BEAM-9132] Use a unique bundle id across all SDK workers bound to the same environment URL: https://github.com/apache/beam/pull/10694 We have observed these errors in a state-intense application: ``` Error processing instruction 107. Original traceback is Traceback (most recent call last): File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs File "redacted.py", line 56, in process recent_events_map = load_recent_events_map(recent_events_state) File "redacted.py", line 128, in _load_recent_events_map items_in_recent_events_bag = list(recent_events_state.read()) File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ for elem in self.first: File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ self._state_key, self._coder_impl, is_cached=self._is_cached) File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get self._materialize_iter(state_key, coder)) File "apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter self._underlying.get_raw(state_key, continuation_token) File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw continuation_token=continuation_token))) File "apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request raise RuntimeError(response.error) RuntimeError: Unknown process bundle instruction id '107' ``` Notice that the error is thrown on the Runner side in the GrpcStateService. Explanation: The error occurs when multiple ExecutableStages are scheduled on the same TaskManager. Depending on how many SDK workers are available, they may end up sharing the same environment. This means that they will also share the same GrpcStateService. The state service uses the bundle id to reference the StateRequestHandler of the currently active bundle. The problem is that the code used a from zero ascending id generator for each stage, even if it shared the same environment. The bundle id generator always start from 0 which eventually leads to a race condition between the state requests of the transforms which share the same environments. Their ids will overlap leading to falsely removing state handlers of other stages in GrpcStateService. Solution: Use a unique id generator per environment which is shared by all the stages within an environment. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.o
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377733&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377733 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 27/Jan/20 16:57 Start Date: 27/Jan/20 16:57 Worklog Time Spent: 10m Work Description: mxm commented on issue #10611: [BEAM-9132] De-register state request handler last when closing ActiveBundle URL: https://github.com/apache/beam/pull/10611#issuecomment-578845147 Closing and opening a new PR as I've now identified the root cause. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377733) Time Spent: 40m (was: 0.5h) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377734&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377734 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 27/Jan/20 16:57 Start Date: 27/Jan/20 16:57 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10611: [BEAM-9132] De-register state request handler last when closing ActiveBundle URL: https://github.com/apache/beam/pull/10611 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377734) Time Spent: 50m (was: 40m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=373214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373214 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 16/Jan/20 19:04 Start Date: 16/Jan/20 19:04 Worklog Time Spent: 10m Work Description: mxm commented on issue #10611: [BEAM-9132] De-register state request handler last when closing ActiveBundle URL: https://github.com/apache/beam/pull/10611#issuecomment-575297289 I thought we assume state requests to be finished only when the input receivers are closed. However, as you pointed out, we shouldn't be receiving state requests after the `ProcessBundleResponse` has arrived. Interesting optimization to send over the last pending state request id. I wonder how much this would improve performance because the bundle response would still be mostly blocked on the state requests before the last. For what it's worth, we have async writes already in the Python SDK. I'm trying to gather some more information on the matter. As far as I can see, the Python SDK should be working correctly in terms of finishing the bundle. I'm kind of suspicious of the bundle processor caching logic which may be selecting the wrong state request handler when the environment expires for any reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 373214) Time Spent: 0.5h (was: 20m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=373015&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373015 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 16/Jan/20 13:45 Start Date: 16/Jan/20 13:45 Worklog Time Spent: 10m Work Description: mxm commented on issue #10611: [BEAM-9132] De-register state request handler last when closing ActiveBundle URL: https://github.com/apache/beam/pull/10611#issuecomment-575157704 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 373015) Time Spent: 20m (was: 10m) > State request handler is removed prematurely when closing ActiveBundle > -- > > Key: BEAM-9132 > URL: https://issues.apache.org/jira/browse/BEAM-9132 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > We have observed these errors in a state-intense application: > {noformat} > Error processing instruction 107. Original traceback is > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 659, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "apache_beam/runners/common.py", line 880, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/common.py", line 895, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "redacted.py", line 56, in process > recent_events_map = load_recent_events_map(recent_events_state) > File "redacted.py", line 128, in _load_recent_events_map > items_in_recent_events_bag = list(recent_events_state.read()) > File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ > for elem in self.first: > File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ > self._state_key, self._coder_impl, is_cached=self._is_cached) > File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get > self._materialize_iter(state_key, coder)) > File "apache_beam/runners/worker/sdk_worker.py", line 723, in > _materialize_iter > self._underlying.get_raw(state_key, continuation_token) > File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw > continuation_token=continuation_token))) > File "apache_beam/runners/worker/sdk_worker.py", line 637, in > _blocking_request > raise RuntimeError(response.error) > RuntimeError: Unknown process bundle instruction id '107' > {noformat} > Notice that the error is thrown on the Runner side. It seems to originate > from the {{ActiveBundle}} de-registering the state request handler too early > when the processing may still be going on in the SDK Harness. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle
[ https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=372990&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-372990 ] ASF GitHub Bot logged work on BEAM-9132: Author: ASF GitHub Bot Created on: 16/Jan/20 12:52 Start Date: 16/Jan/20 12:52 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10611: [BEAM-9132] De-register state request handler last when closing ActiveBundle URL: https://github.com/apache/beam/pull/10611 We have observed these errors in a state-intense application: ``` Error processing instruction 107. Original traceback is Traceback (most recent call last): File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs File "redacted.py", line 56, in process recent_events_map = load_recent_events_map(recent_events_state) File "redacted.py", line 128, in _load_recent_events_map items_in_recent_events_bag = list(recent_events_state.read()) File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__ for elem in self.first: File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__ self._state_key, self._coder_impl, is_cached=self._is_cached) File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get self._materialize_iter(state_key, coder)) File "apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter self._underlying.get_raw(state_key, continuation_token) File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw continuation_token=continuation_token))) File "apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request raise RuntimeError(response.error) RuntimeError: Unknown process bundle instruction id '107' ``` Notice that the error is thrown on the Runner side. The solution is to de-register the state request handler only after ensuring no more elements are in-flight, i.e. the input receivers are closed. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCom