[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380088&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380088
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 31/Jan/20 16:55
Start Date: 31/Jan/20 16:55
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r37358
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -342,22 +343,14 @@ public RemoteBundle getBundle(
   // TODO: Consider having BundleProcessor#newBundle take in an 
OutputReceiverFactory rather
   // than constructing the receiver map here. Every bundle factory will 
need this.
 
-  if (environmentExpirationMillis == 0 && !loadBalanceBundles) {
-return currentClient.processor.newBundle(
-getOutputReceivers(currentClient.processBundleDescriptor, 
outputReceiverFactory)
-.build(),
-stateRequestHandler,
-progressHandler);
-  }
 
 Review comment:
   During the initial review of the bundle load balancing the idea came up to 
have two implementations, one with refing, one without. I think that would be 
the best improvement moving forward. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 380088)
Time Spent: 3h 20m  (was: 3h 10m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380084&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380084
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 31/Jan/20 16:53
Start Date: 31/Jan/20 16:53
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r373580208
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -342,22 +343,14 @@ public RemoteBundle getBundle(
   // TODO: Consider having BundleProcessor#newBundle take in an 
OutputReceiverFactory rather
   // than constructing the receiver map here. Every bundle factory will 
need this.
 
-  if (environmentExpirationMillis == 0 && !loadBalanceBundles) {
-return currentClient.processor.newBundle(
-getOutputReceivers(currentClient.processBundleDescriptor, 
outputReceiverFactory)
-.build(),
-stateRequestHandler,
-progressHandler);
-  }
 
 Review comment:
   I suppose it was working correctly due to the initial ref() when creating 
the environment. Still, it doesn't hurt to remove this bit because it just adds 
a specialized execution path to the generalized version. The ref business is 
already complex enough. If you think differently feel free to add it back, but 
IMHO this is easier to understand. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 380084)
Time Spent: 3h 10m  (was: 3h)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380073&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380073
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 31/Jan/20 16:19
Start Date: 31/Jan/20 16:19
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #10694: [BEAM-9132] 
Avoid logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r373563654
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -342,22 +343,14 @@ public RemoteBundle getBundle(
   // TODO: Consider having BundleProcessor#newBundle take in an 
OutputReceiverFactory rather
   // than constructing the receiver map here. Every bundle factory will 
need this.
 
-  if (environmentExpirationMillis == 0 && !loadBalanceBundles) {
-return currentClient.processor.newBundle(
-getOutputReceivers(currentClient.processBundleDescriptor, 
outputReceiverFactory)
-.build(),
-stateRequestHandler,
-progressHandler);
-  }
 
 Review comment:
   This is the original code path that was used when no reference counting is 
required. What was the problem with it?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 380073)
Time Spent: 3h  (was: 2h 50m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=380003&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380003
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 31/Jan/20 13:50
Start Date: 31/Jan/20 13:50
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 380003)
Time Spent: 2h 50m  (was: 2h 40m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=379936&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379936
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 31/Jan/20 12:22
Start Date: 31/Jan/20 12:22
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r373452096
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -342,22 +343,14 @@ public RemoteBundle getBundle(
   // TODO: Consider having BundleProcessor#newBundle take in an 
OutputReceiverFactory rather
   // than constructing the receiver map here. Every bundle factory will 
need this.
 
-  if (environmentExpirationMillis == 0 && !loadBalanceBundles) {
-return currentClient.processor.newBundle(
-getOutputReceivers(currentClient.processBundleDescriptor, 
outputReceiverFactory)
-.build(),
-stateRequestHandler,
-progressHandler);
-  }
 
 Review comment:
   I discovered another issue in this code path. `ref()` is not called on the 
client in this case. Also, `unref()` will never be called because the code 
below does not insert the call to the bundle close. `close()` is only ever 
called on `unref()`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379936)
Time Spent: 2h 40m  (was: 2.5h)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378991&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378991
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 29/Jan/20 19:52
Start Date: 29/Jan/20 19:52
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372585551
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -406,11 +407,14 @@ public void split(double fractionOfRemainder) {
 
 @Override
 public void close() throws Exception {
-  bundle.close();
-  currentClient.wrappedClient.unref();
-  if (loadBalanceBundles) {
-availableCaches.offer(currentCache);
-availableCachesSemaphore.release();
+  try {
+bundle.close();
+  } finally {
+currentClient.wrappedClient.unref();
+if (loadBalanceBundles) {
+  availableCaches.offer(currentCache);
+  availableCachesSemaphore.release();
+}
 
 Review comment:
   Looks like this was responsible for the cleanup failing. bundle.close() may 
throw leaving the environment still referenced. My tests do not yield any more 
errors like in the description.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378991)
Time Spent: 2.5h  (was: 2h 20m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378950&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378950
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 29/Jan/20 17:59
Start Date: 29/Jan/20 17:59
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372540351
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -255,6 +255,14 @@ public void close() throws Exception {
 // Clear the cache. This closes all active environments.
 // note this may cause open calls to be cancelled by the peer
 for (LoadingCache environmentCache : 
environmentCaches) {
+  for (WrappedSdkHarnessClient client : environmentCache.asMap().values()) 
{
+try {
+  client.close();
 
 Review comment:
   I've removed this in favor of using `unref()`. I think I found the case 
where `unref()` would not be called: 
https://github.com/apache/beam/pull/10694/files#diff-e80c769f0011537cc2b60d3e7898cf5aR413
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378950)
Time Spent: 2h 20m  (was: 2h 10m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378428&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378428
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 28/Jan/20 19:37
Start Date: 28/Jan/20 19:37
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372014284
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
 ##
 @@ -360,6 +360,19 @@ public void closesEnvironmentOnCleanup() throws Exception 
{
 verify(remoteEnvironment).close();
   }
 
+  @Test
+  public void closesEnvironmentOnCleanupWithPendingRefs() throws Exception {
+try (DefaultJobBundleFactory bundleFactory =
+createDefaultJobBundleFactory(envFactoryProviderMap)) {
+  DefaultJobBundleFactory.SimpleStageBundleFactory stageBundleFactory =
+  (DefaultJobBundleFactory.SimpleStageBundleFactory)
+  bundleFactory.forStage(getExecutableStage(environment));
+  // The client is still being used, e.g. when the pipeline fails and is 
shut down
+  stageBundleFactory.currentClient.wrappedClient.ref();
 
 Review comment:
   I'm assuming here we have a `ref()` due to the client being used. If we had 
no ref, we would be idling. The error message we were seeing is only possible 
if we are currently processing a bundle.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378428)
Time Spent: 2h 10m  (was: 2h)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378417&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378417
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 28/Jan/20 19:18
Start Date: 28/Jan/20 19:18
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #10694: [BEAM-9132] 
Avoid logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372004299
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -255,6 +255,14 @@ public void close() throws Exception {
 // Clear the cache. This closes all active environments.
 // note this may cause open calls to be cancelled by the peer
 for (LoadingCache environmentCache : 
environmentCaches) {
+  for (WrappedSdkHarnessClient client : environmentCache.asMap().values()) 
{
+try {
+  client.close();
 
 Review comment:
   Worth mentioning that this is added to close the environments irrespective 
of open bundles, since this will occur only during shutdown?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378417)
Time Spent: 2h  (was: 1h 50m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378412
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 28/Jan/20 19:13
Start Date: 28/Jan/20 19:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #10694: [BEAM-9132] 
Avoid logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372001730
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
 ##
 @@ -360,6 +360,19 @@ public void closesEnvironmentOnCleanup() throws Exception 
{
 verify(remoteEnvironment).close();
   }
 
+  @Test
+  public void closesEnvironmentOnCleanupWithPendingRefs() throws Exception {
+try (DefaultJobBundleFactory bundleFactory =
+createDefaultJobBundleFactory(envFactoryProviderMap)) {
+  DefaultJobBundleFactory.SimpleStageBundleFactory stageBundleFactory =
+  (DefaultJobBundleFactory.SimpleStageBundleFactory)
+  bundleFactory.forStage(getExecutableStage(environment));
+  // The client is still being used, e.g. when the pipeline fails and is 
shut down
+  stageBundleFactory.currentClient.wrappedClient.ref();
 
 Review comment:
   What would cause this extra ref in the actual operator lifecycle?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378412)
Time Spent: 1h 50m  (was: 1h 40m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378411&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378411
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 28/Jan/20 19:12
Start Date: 28/Jan/20 19:12
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Avoid 
logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372001232
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -464,6 +474,8 @@ ServerInfo getServerInfo() {
 }
 
 public void close() {
 
 Review comment:
   It's now also called from here: 
https://github.com/apache/beam/pull/10694/files#diff-e80c769f0011537cc2b60d3e7898cf5aR260
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378411)
Time Spent: 1h 40m  (was: 1.5h)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378410&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378410
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 28/Jan/20 19:10
Start Date: 28/Jan/20 19:10
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #10694: [BEAM-9132] 
Avoid logging misleading error messages during pipeline failure
URL: https://github.com/apache/beam/pull/10694#discussion_r372000633
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 ##
 @@ -464,6 +474,8 @@ ServerInfo getServerInfo() {
 }
 
 public void close() {
 
 Review comment:
   Isn't close only called from unref? If so, how does this change the 
behavior? (Possibly some more explanation needs to be added.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378410)
Time Spent: 1.5h  (was: 1h 20m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=378292&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378292
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 28/Jan/20 15:19
Start Date: 28/Jan/20 15:19
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #10694: [BEAM-9132] Use a unique 
bundle id across all SDK workers bound to the same environment
URL: https://github.com/apache/beam/pull/10694#issuecomment-579296135
 
 
   This does not fix the issue because the id generation scheme is effectively 
the same. I'm still seeing the same errors, but I have a new trace.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378292)
Time Spent: 1h 20m  (was: 1h 10m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377753&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377753
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 27/Jan/20 17:31
Start Date: 27/Jan/20 17:31
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #10694: [BEAM-9132] Use a unique 
bundle id across all SDK workers bound to the same environment
URL: https://github.com/apache/beam/pull/10694#issuecomment-578861479
 
 
   This is a follow-up to #10611 to fix the root cause of the state handler 
de-registration. I appreciate if you could have a look @lukecwik.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 377753)
Time Spent: 1h 10m  (was: 1h)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377737
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 27/Jan/20 17:01
Start Date: 27/Jan/20 17:01
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10694: [BEAM-9132] Use a 
unique bundle id across all SDK workers bound to the same environment
URL: https://github.com/apache/beam/pull/10694
 
 
   We have observed these errors in a state-intense application:
   
   ```
   Error processing instruction 107. Original traceback is
   Traceback (most recent call last):
 File "apache_beam/runners/common.py", line 780, in 
apache_beam.runners.common.DoFnRunner.process
 File "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
 File "apache_beam/runners/common.py", line 659, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
 File "apache_beam/runners/common.py", line 880, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 File "apache_beam/runners/common.py", line 895, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 File "redacted.py", line 56, in process
   recent_events_map = load_recent_events_map(recent_events_state)
 File "redacted.py", line 128, in _load_recent_events_map
   items_in_recent_events_bag = list(recent_events_state.read())
 File "apache_beam/runners/worker/bundle_processor.py", line 335, in 
__iter__
   for elem in self.first:
 File "apache_beam/runners/worker/bundle_processor.py", line 214, in 
__iter__
   self._state_key, self._coder_impl, is_cached=self._is_cached)
 File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
   self._materialize_iter(state_key, coder))
 File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
_materialize_iter
   self._underlying.get_raw(state_key, continuation_token)
 File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
   continuation_token=continuation_token)))
 File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
_blocking_request
   raise RuntimeError(response.error)
   RuntimeError: Unknown process bundle instruction id '107'
   ```
   
   Notice that the error is thrown on the Runner side in the GrpcStateService.
   
   Explanation: The error occurs when multiple ExecutableStages are scheduled on
   the same TaskManager. Depending on how many SDK workers are available, they 
may
   end up sharing the same environment. This means that they will also share the
   same GrpcStateService. The state service uses the bundle id to reference the
   StateRequestHandler of the currently active bundle. The problem is that the 
code
   used a from zero ascending id generator for each stage, even if it shared the
   same environment.
   
   The bundle id generator always start from 0 which eventually leads to a race
   condition between the state requests of the transforms which share the same
   environments. Their ids will overlap leading to falsely removing state 
handlers
   of other stages in GrpcStateService.
   
   Solution: Use a unique id generator per environment which is shared by all 
the
   stages within an environment.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.o

[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377733&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377733
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 27/Jan/20 16:57
Start Date: 27/Jan/20 16:57
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #10611: [BEAM-9132] De-register 
state request handler last when closing ActiveBundle
URL: https://github.com/apache/beam/pull/10611#issuecomment-578845147
 
 
   Closing and opening a new PR as I've now identified the root cause.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 377733)
Time Spent: 40m  (was: 0.5h)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=377734&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377734
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 27/Jan/20 16:57
Start Date: 27/Jan/20 16:57
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10611: [BEAM-9132] 
De-register state request handler last when closing ActiveBundle
URL: https://github.com/apache/beam/pull/10611
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 377734)
Time Spent: 50m  (was: 40m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=373214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373214
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 16/Jan/20 19:04
Start Date: 16/Jan/20 19:04
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #10611: [BEAM-9132] De-register 
state request handler last when closing ActiveBundle
URL: https://github.com/apache/beam/pull/10611#issuecomment-575297289
 
 
   I thought we assume state requests to be finished only when the input 
receivers are closed. However, as you pointed out, we shouldn't be receiving 
state requests after the `ProcessBundleResponse` has arrived. 
   
   Interesting optimization to send over the last pending state request id. I 
wonder how much this would improve performance because the bundle response 
would still be mostly blocked on the state requests before the last. For what 
it's worth, we have async writes already in the Python SDK.
   
   I'm trying to gather some more information on the matter. As far as I can 
see, the Python SDK should be working correctly in terms of finishing the 
bundle. I'm kind of suspicious of the bundle processor caching logic which may 
be selecting the wrong state request handler when the environment expires for 
any reason.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 373214)
Time Spent: 0.5h  (was: 20m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=373015&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373015
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 16/Jan/20 13:45
Start Date: 16/Jan/20 13:45
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #10611: [BEAM-9132] De-register 
state request handler last when closing ActiveBundle
URL: https://github.com/apache/beam/pull/10611#issuecomment-575157704
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 373015)
Time Spent: 20m  (was: 10m)

> State request handler is removed prematurely when closing ActiveBundle
> --
>
> Key: BEAM-9132
> URL: https://issues.apache.org/jira/browse/BEAM-9132
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We have observed these errors in a state-intense application: 
> {noformat}
> Error processing instruction 107. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 780, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 659, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 880, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/common.py", line 895, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "redacted.py", line 56, in process
> recent_events_map = load_recent_events_map(recent_events_state)
>   File "redacted.py", line 128, in _load_recent_events_map
> items_in_recent_events_bag = list(recent_events_state.read())
>   File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
> for elem in self.first:
>   File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
> self._state_key, self._coder_impl, is_cached=self._is_cached)
>   File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
> self._materialize_iter(state_key, coder))
>   File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
> _materialize_iter
> self._underlying.get_raw(state_key, continuation_token)
>   File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
> continuation_token=continuation_token)))
>   File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
> _blocking_request
> raise RuntimeError(response.error)
> RuntimeError: Unknown process bundle instruction id '107'
> {noformat}
> Notice that the error is thrown on the Runner side. It seems to originate 
> from the {{ActiveBundle}} de-registering the state request handler too early 
> when the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9132) State request handler is removed prematurely when closing ActiveBundle

2020-01-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9132?focusedWorklogId=372990&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-372990
 ]

ASF GitHub Bot logged work on BEAM-9132:


Author: ASF GitHub Bot
Created on: 16/Jan/20 12:52
Start Date: 16/Jan/20 12:52
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10611: [BEAM-9132] 
De-register state request handler last when closing ActiveBundle
URL: https://github.com/apache/beam/pull/10611
 
 
   We have observed these errors in a state-intense application:
   
   ```
   Error processing instruction 107. Original traceback is
   Traceback (most recent call last):
 File "apache_beam/runners/common.py", line 780, in 
apache_beam.runners.common.DoFnRunner.process
 File "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
 File "apache_beam/runners/common.py", line 659, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
 File "apache_beam/runners/common.py", line 880, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 File "apache_beam/runners/common.py", line 895, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 File "redacted.py", line 56, in process
   recent_events_map = load_recent_events_map(recent_events_state)
 File "redacted.py", line 128, in _load_recent_events_map
   items_in_recent_events_bag = list(recent_events_state.read())
 File "apache_beam/runners/worker/bundle_processor.py", line 335, in 
__iter__
   for elem in self.first:
 File "apache_beam/runners/worker/bundle_processor.py", line 214, in 
__iter__
   self._state_key, self._coder_impl, is_cached=self._is_cached)
 File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
   self._materialize_iter(state_key, coder))
 File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
_materialize_iter
   self._underlying.get_raw(state_key, continuation_token)
 File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
   continuation_token=continuation_token)))
 File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
_blocking_request
   raise RuntimeError(response.error)
   RuntimeError: Unknown process bundle instruction id '107'
   ```
   
   Notice that the error is thrown on the Runner side. The solution is to
   de-register the state request handler only after ensuring no more elements 
are
   in-flight, i.e. the input receivers are closed.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCom