[jira] [Commented] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages

2022-06-01 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17544772#comment-17544772
 ] 

Sam Whittle commented on BEAM-12942:


Java sdk was fixed with 2.34, reopening to add similar validation to python and 
go sdks

> Dataflow runner specialization of PubsubIO should validate messages
> ---
>
> Key: BEAM-12942
> URL: https://issues.apache.org/jira/browse/BEAM-12942
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.34.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently invalid messages, such as those with attributes exceeding the 
> maximum size are processed by the bundle successfully but fail to commit.
> Throwing an exception when trying to write such a message directly would 
> increase visibility as well as allowing users to catch and handle such 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Reopened] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages

2022-06-01 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reopened BEAM-12942:


> Dataflow runner specialization of PubsubIO should validate messages
> ---
>
> Key: BEAM-12942
> URL: https://issues.apache.org/jira/browse/BEAM-12942
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.34.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently invalid messages, such as those with attributes exceeding the 
> maximum size are processed by the bundle successfully but fail to commit.
> Throwing an exception when trying to write such a message directly would 
> increase visibility as well as allowing users to catch and handle such 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"

2022-03-25 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-14167:
--

Assignee: Kiley Sok  (was: Luke Cwik)

Feel free to mark as duplicate or blocker of BEAM-13695

> Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may 
> be underestimated"
> ---
>
> Key: BEAM-14167
> URL: https://issues.apache.org/jira/browse/BEAM-14167
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Sam Whittle
>Assignee: Kiley Sok
>Priority: P2
>  Labels: Java11
>
> Can we fix these somehow? At a minimum we should catch them when weighing and 
> not cache the state.
> "java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129)
>   at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64)
>   at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202)
>   at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323)
>   at 
> org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246)
>   at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20)
>   at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
>   at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
>   at java.base/java.lang.ClassValue.get(ClassValue.java:116)
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219)
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119)
>   ... 18 more
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make 
> field private final byte[] java.lang.String.value accessible: module 
> java.base does not "opens java.lang" to unnamed module @39529185
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
>   at 
> java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
>   at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241)
>   ... 26 more
> "



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"

2022-03-24 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-14167:
---
Labels: Java11  (was: )

> Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may 
> be underestimated"
> ---
>
> Key: BEAM-14167
> URL: https://issues.apache.org/jira/browse/BEAM-14167
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Sam Whittle
>Assignee: Luke Cwik
>Priority: P2
>  Labels: Java11
>
> Can we fix these somehow? At a minimum we should catch them when weighing and 
> not cache the state.
> "java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129)
>   at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64)
>   at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202)
>   at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323)
>   at 
> org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246)
>   at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20)
>   at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
>   at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
>   at java.base/java.lang.ClassValue.get(ClassValue.java:116)
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219)
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119)
>   ... 18 more
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make 
> field private final byte[] java.lang.String.value accessible: module 
> java.base does not "opens java.lang" to unnamed module @39529185
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
>   at 
> java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
>   at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241)
>   ... 26 more
> "



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"

2022-03-24 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511869#comment-17511869
 ] 

Sam Whittle edited comment on BEAM-14167 at 3/24/22, 1:59 PM:
--

>From 
>https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
> it appears that this was introduced in Java 9, so that could explain why I 
>haven't seen it before since I normally run Java 8 pipelines.

There is also a potential fix there by adding flags to the JVM. 


was (Author: scwhittle):
>From 
>https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
> it appears that this was introduced in Java 9, so that could explain why I 
>haven't seen it before since I normally run Java 8 pipelines.

> Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may 
> be underestimated"
> ---
>
> Key: BEAM-14167
> URL: https://issues.apache.org/jira/browse/BEAM-14167
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Sam Whittle
>Assignee: Luke Cwik
>Priority: P2
>
> Can we fix these somehow? At a minimum we should catch them when weighing and 
> not cache the state.
> "java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129)
>   at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64)
>   at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202)
>   at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323)
>   at 
> org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246)
>   at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20)
>   at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
>   at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
>   at java.base/java.lang.ClassValue.get(ClassValue.java:116)
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219)
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119)
>   ... 18 more
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make 
> field private final byte[] java.lang.String.value accessible: module 
> java.base does not "opens java.lang" to unnamed module @39529185
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanS

[jira] [Commented] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"

2022-03-24 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511869#comment-17511869
 ] 

Sam Whittle commented on BEAM-14167:


>From 
>https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
> it appears that this was introduced in Java 9, so that could explain why I 
>haven't seen it before since I normally run Java 8 pipelines.

> Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may 
> be underestimated"
> ---
>
> Key: BEAM-14167
> URL: https://issues.apache.org/jira/browse/BEAM-14167
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Sam Whittle
>Assignee: Luke Cwik
>Priority: P2
>
> Can we fix these somehow? At a minimum we should catch them when weighing and 
> not cache the state.
> "java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129)
>   at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64)
>   at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202)
>   at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355)
>   at 
> org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323)
>   at 
> org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259)
>   at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.RuntimeException: 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> final byte[] java.lang.String.value accessible: module java.base does not 
> "opens java.lang" to unnamed module @39529185
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246)
>   at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24)
>   at 
> org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20)
>   at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
>   at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
>   at java.base/java.lang.ClassValue.get(ClassValue.java:116)
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219)
>   at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119)
>   ... 18 more
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make 
> field private final byte[] java.lang.String.value accessible: module 
> java.base does not "opens java.lang" to unnamed module @39529185
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
>   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
>   at 
> java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
>   at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
>   at 
> org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241)
>   ... 26 more
> "



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"

2022-03-24 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-14167:
--

 Summary: Jamm exceptions "JVM prevents jamm from accessing 
subgraph - cache sizes may be underestimated"
 Key: BEAM-14167
 URL: https://issues.apache.org/jira/browse/BEAM-14167
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Sam Whittle
Assignee: Luke Cwik


Can we fix these somehow? At a minimum we should catch them when weighing and 
not cache the state.

"java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.reflect.InaccessibleObjectException: Unable to make field private 
final byte[] java.lang.String.value accessible: module java.base does not 
"opens java.lang" to unnamed module @39529185
at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129)
at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64)
at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202)
at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46)
at 
org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363)
at 
org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355)
at 
org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323)
at 
org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259)
at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259)
at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252)
at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700)
at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490)
at 
org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895)
at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486)
at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InaccessibleObjectException: Unable to make field private 
final byte[] java.lang.String.value accessible: module java.base does not 
"opens java.lang" to unnamed module @39529185
at 
org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246)
at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15)
at 
org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24)
at 
org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20)
at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
at java.base/java.lang.ClassValue.get(ClassValue.java:116)
at 
org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219)
at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119)
... 18 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field 
private final byte[] java.lang.String.value accessible: module java.base does 
not "opens java.lang" to unnamed module @39529185
at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at 
java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
at 
org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241)
... 26 more
"



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (BEAM-13826) Reduce number of Gax related threads, likely by providing common executor to GAX clients

2022-02-04 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-13826:
--

 Summary: Reduce number of Gax related threads, likely by providing 
common executor to GAX clients
 Key: BEAM-13826
 URL: https://issues.apache.org/jira/browse/BEAM-13826
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Reporter: Sam Whittle


When looking at thread stacks for a pipeline reading pubsub and writing to BQ 
using the WriteApi  there were 3600 Gax- prefixed threads.

The # of threads an possibly performance could be improved by finding where 
these came from and using a shared executor.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (BEAM-13684) Consider adding information to ProcessBundleRequest indicating there is no existing state

2022-01-18 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-13684:
--

 Summary: Consider adding information to ProcessBundleRequest 
indicating there is no existing state
 Key: BEAM-13684
 URL: https://issues.apache.org/jira/browse/BEAM-13684
 Project: Beam
  Issue Type: Improvement
  Components: beam-model
Reporter: Sam Whittle


This allows for skipping all state fetches made during processing this key, 
preloading the cache with empty values for the key, or extending the cache to 
include this negative state information to avoid fetches as long as no state is 
evicted for the key.

This is particularly beneficial in sparse key streaming pipelines where such 
new keys are common.

It could also be used to avoid fetches in batch pipelines where there is no 
persisted state.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (BEAM-12857) Unable to write to GCS due to IndexOutOfBoundsException in FileSystems

2022-01-17 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477130#comment-17477130
 ] 

Sam Whittle edited comment on BEAM-12857 at 1/17/22, 11:18 AM:
---

>From looking at code it does seem that that such an exception could be 
>encountered with the following:
ignoreMissingSrc was false
skipExistingDest was true
matchSrcResults.get(i).status() was NOT_FOUND and metadata was empty/null
matchDestResults.get(i).status().equals(Status.OK)

This would trigger if the src was missing and the destination already existed.  
This could happen if the file rename stage ran multiple times due to a retry.

https://github.com/apache/beam/pull/15301 changed so that the source files are 
only matched if ignoreMissingSrc was true, so this but itself woudn't occur 
around the metadata.  However there is a similar bug still present if 
ignoreMissingSrc was false and skipExistingDest is true since that examines the 
src results without populating them. The FileBasedSink specifies both of these 
so it is likely upgrading the sdk will fix your issue, though the bug should 
still be fixed.


was (Author: scwhittle):
>From looking at code it does seem that that such an exception could be 
>encountered with the following:
ignoreMissingSrc was false
skipExistingDest was true
matchSrcResults.get(i).status() was NOT_FOUND and metadata was empty/null
matchDestResults.get(i).status().equals(Status.OK)

This would trigger if the src was missing and the destination already existed.  
This could happen if the file rename stage ran multiple times due to a retry.

https://github.com/apache/beam/pull/15301 changed so that the source files are 
only matched if ignoreMissingSrc was true, so this but itself woudn't occur 
around the metadata.  However there is a similar bug still present if 
ignoreMissingSrc was false and skipExistingDest is true since that examines the 
src results without populating them.

> Unable to write to GCS due to IndexOutOfBoundsException in FileSystems
> --
>
> Key: BEAM-12857
> URL: https://issues.apache.org/jira/browse/BEAM-12857
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.31.0, 2.32.0
> Environment: Beam 2.31.0/2.32.0, Java 11, GCP Dataflow
>Reporter: Patrick Lucas
>Priority: P2
>
> I have a simple batch job, running on Dataflow, that reads from a GCS bucket, 
> filters the data, and windows and writes the matching data back to a 
> different path in the same bucket.
> The job seems to succeed in reading and filtering the data, as well as 
> writing temporary files to GCS, but appears to fail when trying to rename the 
> temporary files to their final destination.
> The IndexOutOfBoundsException is thrown from 
> [FileSystems.java:429|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L429]
>  (in 2.32.0), when the code calls {{.get(0)}} on the list returned by a call 
> to {{MatchResult#metadata()}}.
> The javadoc for 
> [{{MatchResult#metadata()}}|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java#L75-L80]
>  says,
> {code:java}
>   /**
>* {@link Metadata} of matched files. Note that if {@link #status()} is 
> {@link Status#NOT_FOUND},
>* this may either throw a {@link java.io.FileNotFoundException} or return 
> an empty list,
>* depending on the {@link EmptyMatchTreatment} used in the {@link 
> FileSystems#match} call.
>*/
> {code}
> So possibly GCS is not returning any metadata for the (missing) destination 
> object? That seems unlikely, as I would expect many others would have already 
> run into this, but I don't see how this could be caused by my user code.
> I have tested this on 2.31.0 and 2.32.0 getting the same error, but it's 
> worth noting that the logic in FileSystems.java changed a decent amount 
> recently in [#15301|https://github.com/apache/beam/pull/15301], maybe having 
> an effect on this, but I haven't been able to test it since I'm working in a 
> closed environment and can only easily use released versions of Beam. Once a 
> version containing this change is released, I will upgrade and try again.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (BEAM-12857) Unable to write to GCS due to IndexOutOfBoundsException in FileSystems

2022-01-17 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477130#comment-17477130
 ] 

Sam Whittle commented on BEAM-12857:


>From looking at code it does seem that that such an exception could be 
>encountered with the following:
ignoreMissingSrc was false
skipExistingDest was true
matchSrcResults.get(i).status() was NOT_FOUND and metadata was empty/null
matchDestResults.get(i).status().equals(Status.OK)

This would trigger if the src was missing and the destination already existed.  
This could happen if the file rename stage ran multiple times due to a retry.

https://github.com/apache/beam/pull/15301 changed so that the source files are 
only matched if ignoreMissingSrc was true, so this but itself woudn't occur 
around the metadata.  However there is a similar bug still present if 
ignoreMissingSrc was false and skipExistingDest is true since that examines the 
src results without populating them.

> Unable to write to GCS due to IndexOutOfBoundsException in FileSystems
> --
>
> Key: BEAM-12857
> URL: https://issues.apache.org/jira/browse/BEAM-12857
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.31.0, 2.32.0
> Environment: Beam 2.31.0/2.32.0, Java 11, GCP Dataflow
>Reporter: Patrick Lucas
>Priority: P2
>
> I have a simple batch job, running on Dataflow, that reads from a GCS bucket, 
> filters the data, and windows and writes the matching data back to a 
> different path in the same bucket.
> The job seems to succeed in reading and filtering the data, as well as 
> writing temporary files to GCS, but appears to fail when trying to rename the 
> temporary files to their final destination.
> The IndexOutOfBoundsException is thrown from 
> [FileSystems.java:429|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L429]
>  (in 2.32.0), when the code calls {{.get(0)}} on the list returned by a call 
> to {{MatchResult#metadata()}}.
> The javadoc for 
> [{{MatchResult#metadata()}}|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java#L75-L80]
>  says,
> {code:java}
>   /**
>* {@link Metadata} of matched files. Note that if {@link #status()} is 
> {@link Status#NOT_FOUND},
>* this may either throw a {@link java.io.FileNotFoundException} or return 
> an empty list,
>* depending on the {@link EmptyMatchTreatment} used in the {@link 
> FileSystems#match} call.
>*/
> {code}
> So possibly GCS is not returning any metadata for the (missing) destination 
> object? That seems unlikely, as I would expect many others would have already 
> run into this, but I don't see how this could be caused by my user code.
> I have tested this on 2.31.0 and 2.32.0 getting the same error, but it's 
> worth noting that the logic in FileSystems.java changed a decent amount 
> recently in [#15301|https://github.com/apache/beam/pull/15301], maybe having 
> an effect on this, but I haven't been able to test it since I'm working in a 
> closed environment and can only easily use released versions of Beam. Once a 
> version containing this change is released, I will upgrade and try again.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (BEAM-12776) Improve parallelism of closing files in FileIO

2022-01-06 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reopened BEAM-12776:


There is a large buffer by default for GCS writes and closing all windows in 
parallel can increase memory usage and trigger OOMs.  I plan on changing this 
to limit parallelism to a certain amount, and allow that parallelism to be 
controlled by an option.

> Improve parallelism of closing files in FileIO
> --
>
> Key: BEAM-12776
> URL: https://issues.apache.org/jira/browse/BEAM-12776
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-files
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently close happens in processElement which is per-window.
> If there are many windows firing this can throttle throughput waiting for IO 
> instead of closing in parallel in finishBundle.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-12776) Improve parallelism of closing files in FileIO

2022-01-06 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12776:
---
Status: Open  (was: Triage Needed)

> Improve parallelism of closing files in FileIO
> --
>
> Key: BEAM-12776
> URL: https://issues.apache.org/jira/browse/BEAM-12776
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-files
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently close happens in processElement which is per-window.
> If there are many windows firing this can throttle throughput waiting for IO 
> instead of closing in parallel in finishBundle.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Work started] (BEAM-12776) Improve parallelism of closing files in FileIO

2022-01-06 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-12776 started by Sam Whittle.
--
> Improve parallelism of closing files in FileIO
> --
>
> Key: BEAM-12776
> URL: https://issues.apache.org/jira/browse/BEAM-12776
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-files
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently close happens in processElement which is per-window.
> If there are many windows firing this can throttle throughput waiting for IO 
> instead of closing in parallel in finishBundle.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine

2021-12-01 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12144:
---
Fix Version/s: 2.32.0
   (was: 2.31.0)

> Dataflow streaming worker stuck and unable to get work from Streaming Engine
> 
>
> Key: BEAM-12144
> URL: https://issues.apache.org/jira/browse/BEAM-12144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.26.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.32.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Observed in 2.26 but seems like it could affect later versions as well, as 
> previous issues addressing similar problems were before 2.26.  This seems 
> similar to BEAM-9651 but not the deadlock observed there.
> The thread getting work has the following stack:
> --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: ---
>   java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method)
>   
> java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
>   java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127)
>   
> java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670)
>   java.base@11.0.9/java.lang.Thread.run(Thread.java:834)
> The status page shows:
> GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight 
> bytes allowed, current stream is 61355396ms old, last send 61355396ms, last 
> response -1ms
> Showing that the stream was created 17 hours ago, sent the header message but 
> never received a response.  With the stack trace it appears that the header 
> was never sent but the stream also didn't terminate with a deadline exceed.  
> This seems like a grpc issue to not get an error for the stream, however it 
> would be safer to not block indefinitely on the Phaser waiting for the send 
> and instead throw an exception after 2x the stream deadline for example.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit

2021-11-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-13268:
---
Fix Version/s: 2.35.0
   Resolution: Fixed
   Status: Resolved  (was: Triage Needed)

> Reduce latency by parallelizing BQ inserts when flushing due to row limit
> -
>
> Key: BEAM-13268
> URL: https://issues.apache.org/jira/browse/BEAM-13268
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.35.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> InsertBatchedElements consumes the output of GroupIntoBatches which flushes 
> after default 500 elements to respect the max items per streaming insert 
> request to BQ.
> However InsertBatchedElements flushes rows synchrounously meaning that 
> latencies of writes accumulates.  It could instead initiate writes in 
> ProcessElement and block on write completion in FinishBundle.  There could be 
> some limited configurable parallelism if desired to limit memory usage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-12818) When writing to GCS, spread prefix of temporary files and reuse autoscaling of the temporary directory

2021-11-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12818:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> When writing to GCS, spread prefix of temporary files and reuse autoscaling 
> of the temporary directory
> --
>
> Key: BEAM-12818
> URL: https://issues.apache.org/jira/browse/BEAM-12818
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P1
> Fix For: 2.35.0
>
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> When writing files using FileIO, the given temporary directory has a 
> subdirectory created in it for each FileBasedSink.  This is useful for 
> non-windowed output where the temporary directory can be matched to delete 
> leftover files that were lost during processing.
> However for windowed writes such subdirectories are unnecessary and cause a 
> common prefix to be shared for the temporary files. Additionally this common 
> prefix varies per job and thus the autoscaling for the previous prefix is no 
> longer effective, see
> https://cloud.google.com/storage/docs/request-rate#randomness_after_sequential_prefixes_is_not_as_effective



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit

2021-11-17 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17445154#comment-17445154
 ] 

Sam Whittle commented on BEAM-13268:


Actually it looks like Pablo improved the BQ services to issue parallel rpcs 
for larger batches. So I think we can just ungroup the grouped input and rely 
on that parallelization.  That also cleans up some code duplication.

> Reduce latency by parallelizing BQ inserts when flushing due to row limit
> -
>
> Key: BEAM-13268
> URL: https://issues.apache.org/jira/browse/BEAM-13268
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> InsertBatchedElements consumes the output of GroupIntoBatches which flushes 
> after default 500 elements to respect the max items per streaming insert 
> request to BQ.
> However InsertBatchedElements flushes rows synchrounously meaning that 
> latencies of writes accumulates.  It could instead initiate writes in 
> ProcessElement and block on write completion in FinishBundle.  There could be 
> some limited configurable parallelism if desired to limit memory usage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements

2021-11-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12472:
---
Fix Version/s: 2.35.0
   (was: 2.34.0)

> BigQuery streaming writes can be batched beyond request limit with 
> BatchAndInsertElements
> -
>
> Key: BEAM-12472
> URL: https://issues.apache.org/jira/browse/BEAM-12472
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Pablo Estrada
>Priority: P2
>  Labels: stale-P2
> Fix For: 2.35.0
>
>
> BatchAndInsertElements accumulates all the input elements and flushes them in 
> finishBundle.
> However if there is enough data the request limit for bigquery can be 
> exceeded causing an exception like the following.  It seems that finishBundle 
> should limit the # of rows and bytes and possibly flush multiple times for a 
> destination.
> Work around would be to use autosharding which uses state that has batching 
> limits or to increase the # of streaming keys to decrease the likelihood of 
> hitting this.
> {code}
> Error while processing a work item: UNKNOWN: 
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> POST 
> https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false
> {
>   "code" : 400,
>   "errors" : [ {
> "domain" : "global",
> "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> "reason" : "badRequest"
>   } ],
>   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
>   "status" : "INVALID_ARGUMENT"
> }
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
>  Source)
>   at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements

2021-11-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12472:
---
Fix Version/s: 2.34.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> BigQuery streaming writes can be batched beyond request limit with 
> BatchAndInsertElements
> -
>
> Key: BEAM-12472
> URL: https://issues.apache.org/jira/browse/BEAM-12472
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Pablo Estrada
>Priority: P2
>  Labels: stale-P2
> Fix For: 2.34.0
>
>
> BatchAndInsertElements accumulates all the input elements and flushes them in 
> finishBundle.
> However if there is enough data the request limit for bigquery can be 
> exceeded causing an exception like the following.  It seems that finishBundle 
> should limit the # of rows and bytes and possibly flush multiple times for a 
> destination.
> Work around would be to use autosharding which uses state that has batching 
> limits or to increase the # of streaming keys to decrease the likelihood of 
> hitting this.
> {code}
> Error while processing a work item: UNKNOWN: 
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> POST 
> https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false
> {
>   "code" : 400,
>   "errors" : [ {
> "domain" : "global",
> "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> "reason" : "badRequest"
>   } ],
>   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
>   "status" : "INVALID_ARGUMENT"
> }
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
>  Source)
>   at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements

2021-11-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-12472:
--

Assignee: Pablo Estrada

> BigQuery streaming writes can be batched beyond request limit with 
> BatchAndInsertElements
> -
>
> Key: BEAM-12472
> URL: https://issues.apache.org/jira/browse/BEAM-12472
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Pablo Estrada
>Priority: P2
>  Labels: stale-P2
>
> BatchAndInsertElements accumulates all the input elements and flushes them in 
> finishBundle.
> However if there is enough data the request limit for bigquery can be 
> exceeded causing an exception like the following.  It seems that finishBundle 
> should limit the # of rows and bytes and possibly flush multiple times for a 
> destination.
> Work around would be to use autosharding which uses state that has batching 
> limits or to increase the # of streaming keys to decrease the likelihood of 
> hitting this.
> {code}
> Error while processing a work item: UNKNOWN: 
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> POST 
> https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false
> {
>   "code" : 400,
>   "errors" : [ {
> "domain" : "global",
> "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> "reason" : "badRequest"
>   } ],
>   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
>   "status" : "INVALID_ARGUMENT"
> }
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
>  Source)
>   at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements

2021-11-17 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17445136#comment-17445136
 ] 

Sam Whittle commented on BEAM-12472:


I believe this would be fixed by Pablo's PR as indicated by Michaels comment 
above.  The underlying BigQueryServicesImpl now splits up rows to respect the 
rpc limits.

> BigQuery streaming writes can be batched beyond request limit with 
> BatchAndInsertElements
> -
>
> Key: BEAM-12472
> URL: https://issues.apache.org/jira/browse/BEAM-12472
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Priority: P2
>  Labels: stale-P2
>
> BatchAndInsertElements accumulates all the input elements and flushes them in 
> finishBundle.
> However if there is enough data the request limit for bigquery can be 
> exceeded causing an exception like the following.  It seems that finishBundle 
> should limit the # of rows and bytes and possibly flush multiple times for a 
> destination.
> Work around would be to use autosharding which uses state that has batching 
> limits or to increase the # of streaming keys to decrease the likelihood of 
> hitting this.
> {code}
> Error while processing a work item: UNKNOWN: 
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> POST 
> https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false
> {
>   "code" : 400,
>   "errors" : [ {
> "domain" : "global",
> "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> "reason" : "badRequest"
>   } ],
>   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
>   "status" : "INVALID_ARGUMENT"
> }
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
>  Source)
>   at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit

2021-11-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-13268:
--

Assignee: Sam Whittle

> Reduce latency by parallelizing BQ inserts when flushing due to row limit
> -
>
> Key: BEAM-13268
> URL: https://issues.apache.org/jira/browse/BEAM-13268
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> InsertBatchedElements consumes the output of GroupIntoBatches which flushes 
> after default 500 elements to respect the max items per streaming insert 
> request to BQ.
> However InsertBatchedElements flushes rows synchrounously meaning that 
> latencies of writes accumulates.  It could instead initiate writes in 
> ProcessElement and block on write completion in FinishBundle.  There could be 
> some limited configurable parallelism if desired to limit memory usage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit

2021-11-17 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-13268:
--

 Summary: Reduce latency by parallelizing BQ inserts when flushing 
due to row limit
 Key: BEAM-13268
 URL: https://issues.apache.org/jira/browse/BEAM-13268
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Reporter: Sam Whittle


InsertBatchedElements consumes the output of GroupIntoBatches which flushes 
after default 500 elements to respect the max items per streaming insert 
request to BQ.

However InsertBatchedElements flushes rows synchrounously meaning that 
latencies of writes accumulates.  It could instead initiate writes in 
ProcessElement and block on write completion in FinishBundle.  There could be 
some limited configurable parallelism if desired to limit memory usage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (BEAM-13042) Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed

2021-10-19 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-13042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-13042:
---
Fix Version/s: 2.35.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed
> --
>
> Key: BEAM-13042
> URL: https://issues.apache.org/jira/browse/BEAM-13042
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.35.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> We have observed stack traces in stuck jobs as follows:
> --- Threads (1): [Thread[pool-8-thread-1,5,main]] State: WAITING stack: ---
>   sun.misc.Unsafe.park(Native Method)
>   java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>   java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>   java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   
> org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.hasFailed(RegisterAndProcessBundleOperation.java:407)
>   
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:332)
>   
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:326)
>   
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$121/1203893465.run(Unknown
>  Source)
>   java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   java.lang.Thread.run(Thread.java:748)
> This code appears to not expect blocking as it checks isDone before calling 
> get()
>   public boolean hasFailed() throws ExecutionException, InterruptedException {
> if (processBundleResponse != null && 
> processBundleResponse.toCompletableFuture().isDone()) {
>   return 
> !processBundleResponse.toCompletableFuture().get().getError().isEmpty();
> } else {
>   // At the very least, we don't know that this has failed yet.
>   return false;
> }
>   }
> I'm unsure why this is occurring but it could be that the two calls to 
> toCompletableFuture are returning different futures for some reason, or that 
> the completion stage is somehow changing from done to undone.  In either case 
> this could by using a single call to  CompletableFuture.getNow method which 
> guarantees not to block.
> This affects the V1 Runner harness which isn't generally used but might as 
> well be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12856) Allow for configuration of unbounded reader max elements, read time etc in StreamingDataflowRunner

2021-10-19 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12856:
---
Fix Version/s: 2.34.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Allow for configuration of unbounded reader max elements, read time etc in 
> StreamingDataflowRunner
> --
>
> Key: BEAM-12856
> URL: https://issues.apache.org/jira/browse/BEAM-12856
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.34.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently in WorkerCustomSources.java it is hard-coded to 10 seconds, 10k 
> elements, with 1sec waiting for elements if none are available.
> There are cases where it would be beneficial to wait longer and process more 
> data so it would be nice if this was controlled by pipeline option.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12291) org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: false] is flaky

2021-10-13 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17428227#comment-17428227
 ] 

Sam Whittle commented on BEAM-12291:


Sorry I didn't see this was assigned to me earlier.  I don't have context to 
debug the Flink tests. I took a look but didn't see anything obvious

> org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
> false] is flaky
> -
>
> Key: BEAM-12291
> URL: https://issues.apache.org/jira/browse/BEAM-12291
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Priority: P1
>  Labels: flake, stale-assigned
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Example: https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/17600/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-12291) org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: false] is flaky

2021-10-13 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-12291:
--

Assignee: (was: Sam Whittle)

> org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
> false] is flaky
> -
>
> Key: BEAM-12291
> URL: https://issues.apache.org/jira/browse/BEAM-12291
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Priority: P1
>  Labels: flake, stale-assigned
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Example: https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/17600/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-13042) Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed

2021-10-13 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-13042:
--

 Summary: Prevent unexpected blocking in 
RegisterAndProcessBundleOperation hasFailed
 Key: BEAM-13042
 URL: https://issues.apache.org/jira/browse/BEAM-13042
 Project: Beam
  Issue Type: Improvement
  Components: runner-dataflow
Reporter: Sam Whittle
Assignee: Sam Whittle


We have observed stack traces in stuck jobs as follows:
--- Threads (1): [Thread[pool-8-thread-1,5,main]] State: WAITING stack: ---
  sun.misc.Unsafe.park(Native Method)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
  java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
  
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.hasFailed(RegisterAndProcessBundleOperation.java:407)
  
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:332)
  
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:326)
  
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$121/1203893465.run(Unknown
 Source)
  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  java.lang.Thread.run(Thread.java:748)


This code appears to not expect blocking as it checks isDone before calling 
get()

  public boolean hasFailed() throws ExecutionException, InterruptedException {
if (processBundleResponse != null && 
processBundleResponse.toCompletableFuture().isDone()) {
  return 
!processBundleResponse.toCompletableFuture().get().getError().isEmpty();
} else {
  // At the very least, we don't know that this has failed yet.
  return false;
}
  }

I'm unsure why this is occurring but it could be that the two calls to 
toCompletableFuture are returning different futures for some reason, or that 
the completion stage is somehow changing from done to undone.  In either case 
this could by using a single call to  CompletableFuture.getNow method which 
guarantees not to block.

This affects the V1 Runner harness which isn't generally used but might as well 
be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages

2021-09-29 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12942:
---
Fix Version/s: (was: 2.33.0)
   2.34.0

> Dataflow runner specialization of PubsubIO should validate messages
> ---
>
> Key: BEAM-12942
> URL: https://issues.apache.org/jira/browse/BEAM-12942
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.34.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently invalid messages, such as those with attributes exceeding the 
> maximum size are processed by the bundle successfully but fail to commit.
> Throwing an exception when trying to write such a message directly would 
> increase visibility as well as allowing users to catch and handle such 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages

2021-09-29 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12942:
---
Fix Version/s: 2.33.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Dataflow runner specialization of PubsubIO should validate messages
> ---
>
> Key: BEAM-12942
> URL: https://issues.apache.org/jira/browse/BEAM-12942
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.33.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently invalid messages, such as those with attributes exceeding the 
> maximum size are processed by the bundle successfully but fail to commit.
> Throwing an exception when trying to write such a message directly would 
> increase visibility as well as allowing users to catch and handle such 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages

2021-09-23 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17419231#comment-17419231
 ] 

Sam Whittle commented on BEAM-12942:


The validation seems like it could be helpful for other non-specialized 
PubsubIO because it would throw an exception when the message is produced 
instead of exception from pubsub publish result which could correspond to a 
batch and thus has less context.

> Dataflow runner specialization of PubsubIO should validate messages
> ---
>
> Key: BEAM-12942
> URL: https://issues.apache.org/jira/browse/BEAM-12942
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Currently invalid messages, such as those with attributes exceeding the 
> maximum size are processed by the bundle successfully but fail to commit.
> Throwing an exception when trying to write such a message directly would 
> increase visibility as well as allowing users to catch and handle such 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages

2021-09-23 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12942:
--

 Summary: Dataflow runner specialization of PubsubIO should 
validate messages
 Key: BEAM-12942
 URL: https://issues.apache.org/jira/browse/BEAM-12942
 Project: Beam
  Issue Type: Improvement
  Components: runner-dataflow
Reporter: Sam Whittle
Assignee: Sam Whittle


Currently invalid messages, such as those with attributes exceeding the maximum 
size are processed by the bundle successfully but fail to commit.

Throwing an exception when trying to write such a message directly would 
increase visibility as well as allowing users to catch and handle such 
exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work started] (BEAM-7913) Add drain() to DataflowPipelineJob

2021-09-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-7913 started by null.
--
> Add drain() to DataflowPipelineJob
> --
>
> Key: BEAM-7913
> URL: https://issues.apache.org/jira/browse/BEAM-7913
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Priority: P3
>
> Dataflow supports draining jobs but there is no easy programatic way to do it.
> I propose adding a drain() method to DataflowPipelineJob similar to the 
> existing cancel() method (inherited from PipelineResult).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12740) Reduce and backoff GCS metadata operations when writing to GCS files

2021-09-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12740:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Reduce and backoff GCS metadata operations when writing to GCS files
> 
>
> Key: BEAM-12740
> URL: https://issues.apache.org/jira/browse/BEAM-12740
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> When issuing GCS operations affecting metadata (ie file-level operations not 
> read/write operations), GCS may return errors indicating backoff. See
> https://cloud.google.com/storage/docs/request-rate#ramp-up
> If such errors are encountered, currently the exception is not handled by 
> GcsUtil.java and is propagated, causing retries and backoff of all operations 
> at a higher level.  Instead we should backoff and retry only such files that 
> require it.
> Additionally FileBasedSink issues deletes for files that have been renamed.  
> The rename itself should take care of removing the original file and thus we 
> can reduce some metadata operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12740) Reduce and backoff GCS metadata operations when writing to GCS files

2021-09-17 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12740:
---
Fix Version/s: 2.34.0

> Reduce and backoff GCS metadata operations when writing to GCS files
> 
>
> Key: BEAM-12740
> URL: https://issues.apache.org/jira/browse/BEAM-12740
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.34.0
>
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> When issuing GCS operations affecting metadata (ie file-level operations not 
> read/write operations), GCS may return errors indicating backoff. See
> https://cloud.google.com/storage/docs/request-rate#ramp-up
> If such errors are encountered, currently the exception is not handled by 
> GcsUtil.java and is propagated, causing retries and backoff of all operations 
> at a higher level.  Instead we should backoff and retry only such files that 
> require it.
> Additionally FileBasedSink issues deletes for files that have been renamed.  
> The rename itself should take care of removing the original file and thus we 
> can reduce some metadata operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12445) Move Python's BigQuery streaming insert sink to the new BigQuery api client

2021-09-16 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416247#comment-17416247
 ] 

Sam Whittle commented on BEAM-12445:


I believe the timeout needs to be set on the new insert_json_rows call in 
bigquery_tools.
I think we observed stuck call due to this.  The retry at the higher level has 
a timeout of 2 minutes so a minute or 2 seems like a reasonable deadline for 
the call.

> Move Python's BigQuery streaming insert sink to the new BigQuery api client
> ---
>
> Key: BEAM-12445
> URL: https://issues.apache.org/jira/browse/BEAM-12445
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Reporter: Pablo Estrada
>Priority: P2
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12776) Improve parallelism of closing files in FileIO

2021-09-08 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12776:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Improve parallelism of closing files in FileIO
> --
>
> Key: BEAM-12776
> URL: https://issues.apache.org/jira/browse/BEAM-12776
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-files
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently close happens in processElement which is per-window.
> If there are many windows firing this can throttle throughput waiting for IO 
> instead of closing in parallel in finishBundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12856) Allow for configuration of unbounded reader max elements, read time etc in StreamingDataflowRunner

2021-09-08 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12856:
---
Description: 
Currently in WorkerCustomSources.java it is hard-coded to 10 seconds, 10k 
elements, with 1sec waiting for elements if none are available.
There are cases where it would be beneficial to wait longer and process more 
data so it would be nice if this was controlled by pipeline option.

> Allow for configuration of unbounded reader max elements, read time etc in 
> StreamingDataflowRunner
> --
>
> Key: BEAM-12856
> URL: https://issues.apache.org/jira/browse/BEAM-12856
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Currently in WorkerCustomSources.java it is hard-coded to 10 seconds, 10k 
> elements, with 1sec waiting for elements if none are available.
> There are cases where it would be beneficial to wait longer and process more 
> data so it would be nice if this was controlled by pipeline option.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12856) Allow for configuration of unbounded reader max elements, read time etc in StreamingDataflowRunner

2021-09-08 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12856:
--

 Summary: Allow for configuration of unbounded reader max elements, 
read time etc in StreamingDataflowRunner
 Key: BEAM-12856
 URL: https://issues.apache.org/jira/browse/BEAM-12856
 Project: Beam
  Issue Type: Improvement
  Components: runner-dataflow
Reporter: Sam Whittle
Assignee: Sam Whittle






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11994) Java BigQuery - Implement IO Request Count metrics

2021-08-30 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406716#comment-17406716
 ] 

Sam Whittle commented on BEAM-11994:


I observed the following in a test, it seems related to changes in 
ServiceCallMetric made in pull requests for this issue.

java.lang.RuntimeException: java.util.ConcurrentModificationException
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1047)
at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:387)
at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:72)
at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$InsertBatchedElements.processElement(BatchedStreamingWrite.java:350)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
at 
org.apache.beam.runners.core.metrics.MonitoringInfoMetricName.(MonitoringInfoMetricName.java:50)
at 
org.apache.beam.runners.core.metrics.MonitoringInfoMetricName.named(MonitoringInfoMetricName.java:95)
at 
org.apache.beam.runners.core.metrics.ServiceCallMetric.call(ServiceCallMetric.java:82)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:905)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1560)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


> Java BigQuery - Implement IO Request Count metrics
> --
>
> Key: BEAM-11994
> URL: https://issues.apache.org/jira/browse/BEAM-11994
> Project: Beam
>  Issue Type: Test
>  Components: io-java-gcp
>Reporter: Alex Amato
>Assignee: Alex Amato
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 23.5h
>  Remaining Estimate: 0h
>
> Reference PRs (See BigQuery IO example) and detailed explanation of what's 
> needed to instrument this IO with Request Count metrics is found in this 
> handoff doc:
> [https://docs.google.com/document/d/1lrz2wE5Dl4zlUfPAenjXIQyleZvqevqoxhyE85aj4sc/edit'|https://docs.google.com/document/d/1lrz2wE5Dl4zlUfPAenjXIQyleZvqevqoxhyE85aj4sc/edit'?authuser=0]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12818) When writing to GCS, spread prefix of temporary files and reuse autoscaling of the temporary directory

2021-08-30 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12818:
--

 Summary: When writing to GCS, spread prefix of temporary files and 
reuse autoscaling of the temporary directory
 Key: BEAM-12818
 URL: https://issues.apache.org/jira/browse/BEAM-12818
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Reporter: Sam Whittle
Assignee: Sam Whittle


When writing files using FileIO, the given temporary directory has a 
subdirectory created in it for each FileBasedSink.  This is useful for 
non-windowed output where the temporary directory can be matched to delete 
leftover files that were lost during processing.

However for windowed writes such subdirectories are unnecessary and cause a 
common prefix to be shared for the temporary files. Additionally this common 
prefix varies per job and thus the autoscaling for the previous prefix is no 
longer effective, see
https://cloud.google.com/storage/docs/request-rate#randomness_after_sequential_prefixes_is_not_as_effective



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12780) StreamingDataflowWorker should limit local retries

2021-08-23 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12780:
---
Fix Version/s: 2.32.0
   Resolution: Fixed
   Status: Resolved  (was: In Progress)

> StreamingDataflowWorker should limit local retries
> --
>
> Key: BEAM-12780
> URL: https://issues.apache.org/jira/browse/BEAM-12780
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.32.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Local retries are performed for most user exceptions
> This causes things to get stuck unnecessarily if the work item itself was 
> somehow corrupted additionally such processing may no longer be relevant on 
> the worker and should be given up after a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work started] (BEAM-12780) StreamingDataflowWorker should limit local retries

2021-08-20 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-12780 started by Sam Whittle.
--
> StreamingDataflowWorker should limit local retries
> --
>
> Key: BEAM-12780
> URL: https://issues.apache.org/jira/browse/BEAM-12780
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Local retries are performed for most user exceptions
> This causes things to get stuck unnecessarily if the work item itself was 
> somehow corrupted additionally such processing may no longer be relevant on 
> the worker and should be given up after a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12780) StreamingDataflowWorker should limit local retries

2021-08-20 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12780:
---
Status: Open  (was: Triage Needed)

> StreamingDataflowWorker should limit local retries
> --
>
> Key: BEAM-12780
> URL: https://issues.apache.org/jira/browse/BEAM-12780
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Local retries are performed for most user exceptions
> This causes things to get stuck unnecessarily if the work item itself was 
> somehow corrupted additionally such processing may no longer be relevant on 
> the worker and should be given up after a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-12780) StreamingDataflowWorker should limit local retries

2021-08-20 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-12780:
--

Assignee: Sam Whittle

> StreamingDataflowWorker should limit local retries
> --
>
> Key: BEAM-12780
> URL: https://issues.apache.org/jira/browse/BEAM-12780
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Local retries are performed for most user exceptions
> This causes things to get stuck unnecessarily if the work item itself was 
> somehow corrupted additionally such processing may no longer be relevant on 
> the worker and should be given up after a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12780) StreamingDataflowWorker should limit local retries

2021-08-20 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12780:
--

 Summary: StreamingDataflowWorker should limit local retries
 Key: BEAM-12780
 URL: https://issues.apache.org/jira/browse/BEAM-12780
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Sam Whittle


Local retries are performed for most user exceptions

This causes things to get stuck unnecessarily if the work item itself was 
somehow corrupted additionally such processing may no longer be relevant on the 
worker and should be given up after a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12776) Improve parallelism of closing files in FileIO

2021-08-19 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12776:
--

 Summary: Improve parallelism of closing files in FileIO
 Key: BEAM-12776
 URL: https://issues.apache.org/jira/browse/BEAM-12776
 Project: Beam
  Issue Type: Bug
  Components: io-java-files
Reporter: Sam Whittle
Assignee: Sam Whittle


Currently close happens in processElement which is per-window.
If there are many windows firing this can throttle throughput waiting for IO 
instead of closing in parallel in finishBundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12740) Reduce and backoff GCS metadata operations when writing to GCS files

2021-08-11 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12740:
--

 Summary: Reduce and backoff GCS metadata operations when writing 
to GCS files
 Key: BEAM-12740
 URL: https://issues.apache.org/jira/browse/BEAM-12740
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Reporter: Sam Whittle
Assignee: Sam Whittle


When issuing GCS operations affecting metadata (ie file-level operations not 
read/write operations), GCS may return errors indicating backoff. See
https://cloud.google.com/storage/docs/request-rate#ramp-up

If such errors are encountered, currently the exception is not handled by 
GcsUtil.java and is propagated, causing retries and backoff of all operations 
at a higher level.  Instead we should backoff and retry only such files that 
require it.

Additionally FileBasedSink issues deletes for files that have been renamed.  
The rename itself should take care of removing the original file and thus we 
can reduce some metadata operations.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine

2021-06-22 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12144:
---
Fix Version/s: 2.31.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Dataflow streaming worker stuck and unable to get work from Streaming Engine
> 
>
> Key: BEAM-12144
> URL: https://issues.apache.org/jira/browse/BEAM-12144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.26.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.31.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Observed in 2.26 but seems like it could affect later versions as well, as 
> previous issues addressing similar problems were before 2.26.  This seems 
> similar to BEAM-9651 but not the deadlock observed there.
> The thread getting work has the following stack:
> --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: ---
>   java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method)
>   
> java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
>   java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127)
>   
> java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670)
>   java.base@11.0.9/java.lang.Thread.run(Thread.java:834)
> The status page shows:
> GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight 
> bytes allowed, current stream is 61355396ms old, last send 61355396ms, last 
> response -1ms
> Showing that the stream was created 17 hours ago, sent the header message but 
> never received a response.  With the stack trace it appears that the header 
> was never sent but the stream also didn't terminate with a deadline exceed.  
> This seems like a grpc issue to not get an error for the stream, however it 
> would be safer to not block indefinitely on the Phaser waiting for the send 
> and instead throw an exception after 2x the stream deadline for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12516) StreamingDataflowWorker.ShardedKey.toString throws exception if key is less than 100 bytes

2021-06-21 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12516:
--

 Summary: StreamingDataflowWorker.ShardedKey.toString throws 
exception if key is less than 100 bytes
 Key: BEAM-12516
 URL: https://issues.apache.org/jira/browse/BEAM-12516
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Affects Versions: 2.28.0
Reporter: Sam Whittle
Assignee: Sam Whittle


ByteString.substring does not support an end index beyond the string length.

example exception:

SLF4J: Failed toString() invocation on an object of type 
[org.apache.beam.runners.dataflow.worker.AutoValue_StreamingDataflowWorker_ShardedKey]

java.lang.IndexOutOfBoundsException: End index: 100 >= 3
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.checkRange(ByteString.java:1272)
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString$LiteralByteString.substring(ByteString.java:1343)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$ShardedKey.toString(StreamingDataflowWorker.java:1144)
at 
org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277)
at 
org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249)
at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211)
at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161)
at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:151)
at org.slf4j.impl.JDK14LoggerAdapter.error(JDK14LoggerAdapter.java:522)
at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$ComputationState.invalidateStuckCommits(StreamingDataflowWorker.java:2326)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements

2021-06-17 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364810#comment-17364810
 ] 

Sam Whittle commented on BEAM-12472:


I was using withoutInsertIds, which perhaps triggers a different code path
than without. I believe that is newer functionality but I'm not familiar
with the code and have further investigated.

On Tue, Jun 15, 2021, 9:48 PM Kenneth Knowles (Jira) 



> BigQuery streaming writes can be batched beyond request limit with 
> BatchAndInsertElements
> -
>
> Key: BEAM-12472
> URL: https://issues.apache.org/jira/browse/BEAM-12472
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Sam Whittle
>Priority: P2
>
> BatchAndInsertElements accumulates all the input elements and flushes them in 
> finishBundle.
> However if there is enough data the request limit for bigquery can be 
> exceeded causing an exception like the following.  It seems that finishBundle 
> should limit the # of rows and bytes and possibly flush multiple times for a 
> destination.
> Work around would be to use autosharding which uses state that has batching 
> limits or to increase the # of streaming keys to decrease the likelihood of 
> hitting this.
> "Error while processing a work item: UNKNOWN: 
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> POST 
> https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false
> {
>   "code" : 400,
>   "errors" : [ {
> "domain" : "global",
> "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> "reason" : "badRequest"
>   } ],
>   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
>   "status" : "INVALID_ARGUMENT"
> }
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
>  Source)
>   at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements

2021-06-09 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12472:
--

 Summary: BigQuery streaming writes can be batched beyond request 
limit with BatchAndInsertElements
 Key: BEAM-12472
 URL: https://issues.apache.org/jira/browse/BEAM-12472
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Reporter: Sam Whittle


BatchAndInsertElements accumulates all the input elements and flushes them in 
finishBundle.
However if there is enough data the request limit for bigquery can be exceeded 
causing an exception like the following.  It seems that finishBundle should 
limit the # of rows and bytes and possibly flush multiple times for a 
destination.

Work around would be to use autosharding which uses state that has batching 
limits or to increase the # of streaming keys to decrease the likelihood of 
hitting this.

"Error while processing a work item: UNKNOWN: 
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
POST 
https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false
{
  "code" : 400,
  "errors" : [ {
"domain" : "global",
"message" : "Request payload size exceeds the limit: 10485760 bytes.",
"reason" : "badRequest"
  } ],
  "message" : "Request payload size exceeds the limit: 10485760 bytes.",
  "status" : "INVALID_ARGUMENT"
}
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
 Source)
at 
org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12402) Optimize PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver

2021-05-25 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12402:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Optimize PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver
> -
>
> Key: BEAM-12402
> URL: https://issues.apache.org/jira/browse/BEAM-12402
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> In Nexmark benchmark profile this was using 2% of cpu on
> AbstractMapBasedMultimap$WrappedCollection$WrappedIterator
> methods.
>  I believe this is due to the list returned here being a wrapper around the 
> multiset.
> https://github.com/apache/beam/blob/8463a054c1d7e2b7ee8d11e9569e065cb5e02196/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java#L172
> We iterate over this list many times for counters. It appears that we don't 
> need a wrapped list as it is documented that all consumers should be 
> registered first.  So it seems we can just create a copy there to an 
> immutable list and trivially save that cpu.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12402) Optimize PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver

2021-05-25 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12402:
--

 Summary: Optimize 
PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver
 Key: BEAM-12402
 URL: https://issues.apache.org/jira/browse/BEAM-12402
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-harness
Reporter: Sam Whittle
Assignee: Sam Whittle


In Nexmark benchmark profile this was using 2% of cpu on
AbstractMapBasedMultimap$WrappedCollection$WrappedIterator
methods.

 I believe this is due to the list returned here being a wrapper around the 
multiset.
https://github.com/apache/beam/blob/8463a054c1d7e2b7ee8d11e9569e065cb5e02196/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java#L172

We iterate over this list many times for counters. It appears that we don't 
need a wrapped list as it is documented that all consumers should be registered 
first.  So it seems we can just create a copy there to an immutable list and 
trivially save that cpu.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7717) PubsubIO watermark tracking hovers near start of epoch

2021-05-25 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-7717:
--
Fix Version/s: 2.31.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> PubsubIO watermark tracking hovers near start of epoch
> --
>
> Key: BEAM-7717
> URL: https://issues.apache.org/jira/browse/BEAM-7717
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Steve Niemitz
>Assignee: Sam Whittle
>Priority: P1
> Fix For: 2.31.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> {quote}The watermark tracking seems off.  The dataflow UI was reporting my 
> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which 
> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you 
> take the time since epoch in milliseconds now and interpret it as seconds, 
> you'll get somewhere around 1970-01-18).{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-7717) PubsubIO watermark tracking hovers near start of epoch

2021-05-20 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-7717:
-

Assignee: Sam Whittle

> PubsubIO watermark tracking hovers near start of epoch
> --
>
> Key: BEAM-7717
> URL: https://issues.apache.org/jira/browse/BEAM-7717
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Steve Niemitz
>Assignee: Sam Whittle
>Priority: P1
>
> {quote}The watermark tracking seems off.  The dataflow UI was reporting my 
> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which 
> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you 
> take the time since epoch in milliseconds now and interpret it as seconds, 
> you'll get somewhere around 1970-01-18).{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine

2021-05-10 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342140#comment-17342140
 ] 

Sam Whittle commented on BEAM-12144:


PR is being reviewed https://github.com/apache/beam/pull/14714

> Dataflow streaming worker stuck and unable to get work from Streaming Engine
> 
>
> Key: BEAM-12144
> URL: https://issues.apache.org/jira/browse/BEAM-12144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.26.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Observed in 2.26 but seems like it could affect later versions as well, as 
> previous issues addressing similar problems were before 2.26.  This seems 
> similar to BEAM-9651 but not the deadlock observed there.
> The thread getting work has the following stack:
> --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: ---
>   java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method)
>   
> java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
>   java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127)
>   
> java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670)
>   java.base@11.0.9/java.lang.Thread.run(Thread.java:834)
> The status page shows:
> GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight 
> bytes allowed, current stream is 61355396ms old, last send 61355396ms, last 
> response -1ms
> Showing that the stream was created 17 hours ago, sent the header message but 
> never received a response.  With the stack trace it appears that the header 
> was never sent but the stream also didn't terminate with a deadline exceed.  
> This seems like a grpc issue to not get an error for the stream, however it 
> would be safer to not block indefinitely on the Phaser waiting for the send 
> and instead throw an exception after 2x the stream deadline for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine

2021-05-10 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12144:
---
Labels:   (was: stale-assigned)

> Dataflow streaming worker stuck and unable to get work from Streaming Engine
> 
>
> Key: BEAM-12144
> URL: https://issues.apache.org/jira/browse/BEAM-12144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.26.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Observed in 2.26 but seems like it could affect later versions as well, as 
> previous issues addressing similar problems were before 2.26.  This seems 
> similar to BEAM-9651 but not the deadlock observed there.
> The thread getting work has the following stack:
> --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: ---
>   java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method)
>   
> java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
>   java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127)
>   
> java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057)
>   
> java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843)
>   
> app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047)
>   
> app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670)
>   java.base@11.0.9/java.lang.Thread.run(Thread.java:834)
> The status page shows:
> GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight 
> bytes allowed, current stream is 61355396ms old, last send 61355396ms, last 
> response -1ms
> Showing that the stream was created 17 hours ago, sent the header message but 
> never received a response.  With the stack trace it appears that the header 
> was never sent but the stream also didn't terminate with a deadline exceed.  
> This seems like a grpc issue to not get an error for the stream, however it 
> would be safer to not block indefinitely on the Phaser waiting for the send 
> and instead throw an exception after 2x the stream deadline for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12254) Nexmark UnboundedReader does not report backlog correctly

2021-05-03 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12254:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Nexmark UnboundedReader does not report backlog correctly
> -
>
> Key: BEAM-12254
> URL: https://issues.apache.org/jira/browse/BEAM-12254
> Project: Beam
>  Issue Type: Bug
>  Components: testing-nexmark
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The reader only reports backlog if it has been initialized by reading 
> elements and only if rate limiting is on.
> However if rate limiting is off and their is a bounded # of elements the 
> backlog can also be calcualted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing

2021-05-03 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12118:
---
Resolution: Fixed
Status: Resolved  (was: Triage Needed)

> QueuingBeamFnDataClient adds polling latency to completing bundle processing
> 
>
> Key: BEAM-12118
> URL: https://issues.apache.org/jira/browse/BEAM-12118
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.30.0
>
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> Currently the inboundDataClients are registered with recieve, and they add 
> data to a queue. There is no explicit indication from the clients that they 
> are no longer going to add values to the queue.
> Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled 
> and if nothing is present all clients are polled to see if they are complete.
> This design makes for unfortunate tradeoffs on poll timeout:
> - cpu wasted with small timeout
> - additional latency in noticing we have completed with larger timeout
> With the existing InboundDataClient interface, we could have a separate 
> thread call awaitCompletion on all of the clients and then shutdown the queue 
> (adding a poison pill perhaps)
> Or we could modify InboundDataClient interface to allow registering iterest 
> in when the client is done producing elements.  The existing clients all seem 
> based upon futures which allow that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12209) DirectStreamObserver is not thread-safe as advertised due to racy integer operations

2021-05-03 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12209:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> DirectStreamObserver is not thread-safe as advertised due to racy integer 
> operations
> 
>
> Key: BEAM-12209
> URL: https://issues.apache.org/jira/browse/BEAM-12209
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> An AtomicInteger should be used instead for the message counting for periodic 
> flow control checking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12203) Reduce thread context switches in BeamFnControlClient

2021-05-03 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12203:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Reduce thread context switches in BeamFnControlClient
> -
>
> Key: BEAM-12203
> URL: https://issues.apache.org/jira/browse/BEAM-12203
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> This class has a grpc observer that queues recieved messages on a 
> LinkedBlockingQueue and then has a separate thread pulling from the the queue 
> to add to an executor.
> Instead we could just add directly to the executor from the grpc stream, as 
> the executor by default has an unbounded queue size (and there are comments 
> indicating that a limited size executor can cause deadlocks otherwise)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work started] (BEAM-12253) Read.UnboundedSourceAsSDFRestrictionTracker doesn't use cache for readers in getProgress

2021-05-03 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-12253 started by Sam Whittle.
--
> Read.UnboundedSourceAsSDFRestrictionTracker doesn't use cache for readers in 
> getProgress
> 
>
> Key: BEAM-12253
> URL: https://issues.apache.org/jira/browse/BEAM-12253
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L963
> In getProgress, if currentReader == null a new reader is constructed from the 
> restriction checkpoint.  I think that we should consult the reader cache as 
> creating readers is sometimes expensive.
> I observed in Nexmark pipelines that readers were being created at this 
> location.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12254) Nexmark UnboundedReader does not report backlog correctly

2021-04-29 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12254:
--

 Summary: Nexmark UnboundedReader does not report backlog correctly
 Key: BEAM-12254
 URL: https://issues.apache.org/jira/browse/BEAM-12254
 Project: Beam
  Issue Type: Bug
  Components: testing-nexmark
Reporter: Sam Whittle
Assignee: Sam Whittle


The reader only reports backlog if it has been initialized by reading elements 
and only if rate limiting is on.
However if rate limiting is off and their is a bounded # of elements the 
backlog can also be calcualted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12253) Read.UnboundedSourceAsSDFRestrictionTracker doesn't use cache for readers in getProgress

2021-04-29 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12253:
--

 Summary: Read.UnboundedSourceAsSDFRestrictionTracker doesn't use 
cache for readers in getProgress
 Key: BEAM-12253
 URL: https://issues.apache.org/jira/browse/BEAM-12253
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Sam Whittle
Assignee: Sam Whittle


https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L963

In getProgress, if currentReader == null a new reader is constructed from the 
restriction checkpoint.  I think that we should consult the reader cache as 
creating readers is sometimes expensive.

I observed in Nexmark pipelines that readers were being created at this 
location.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing

2021-04-28 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reopened BEAM-12118:


Reopening to track fixing race triggering precondition

> QueuingBeamFnDataClient adds polling latency to completing bundle processing
> 
>
> Key: BEAM-12118
> URL: https://issues.apache.org/jira/browse/BEAM-12118
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.30.0
>
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Currently the inboundDataClients are registered with recieve, and they add 
> data to a queue. There is no explicit indication from the clients that they 
> are no longer going to add values to the queue.
> Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled 
> and if nothing is present all clients are polled to see if they are complete.
> This design makes for unfortunate tradeoffs on poll timeout:
> - cpu wasted with small timeout
> - additional latency in noticing we have completed with larger timeout
> With the existing InboundDataClient interface, we could have a separate 
> thread call awaitCompletion on all of the clients and then shutdown the queue 
> (adding a poison pill perhaps)
> Or we could modify InboundDataClient interface to allow registering iterest 
> in when the client is done producing elements.  The existing clients all seem 
> based upon futures which allow that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-12229) WindmillStateCache has a 0% hit rate in 2.29

2021-04-27 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-12229:
--

Assignee: Sam Whittle  (was: Reuven Lax)

> WindmillStateCache has a 0% hit rate in 2.29
> 
>
> Key: BEAM-12229
> URL: https://issues.apache.org/jira/browse/BEAM-12229
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.29.0
>Reporter: Steve Niemitz
>Assignee: Sam Whittle
>Priority: P1
> Fix For: 2.30.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> After upgrading to 2.29, I noticed that our jobs have a 0% state cache hit 
> rate.  I see a very high eviction rate from the cache as well (it used to be 
> ~0, now its ~100,000+ evictions / second).
> We never were on 2.28, so I can't say if it worked there, but it did work on 
> 2.27.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12229) WindmillStateCache has a 0% hit rate in 2.29

2021-04-27 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17333208#comment-17333208
 ] 

Sam Whittle commented on BEAM-12229:


Ugh, I see the issue. I modified how tokens were validated after testing the 
change and broke it for any fused stages with more than 1 step.
The unit test is at a lower level than that misuse so it didn't catch it.

I put together PR https://github.com/apache/beam/pull/14649 but was unable to 
modify StreamingDataflowWorkerTest to reproduce the issue yet due to it's 
complexity.  I'd like to do that before submitting.

Maybe it is best to rollback the original PR in the interim though.

The bug should only affect the cache effectiveness not corrupt data AFAICT.  
Your combiner state could be different because the combiner does local 
combining differently before applying if it is cached.

> WindmillStateCache has a 0% hit rate in 2.29
> 
>
> Key: BEAM-12229
> URL: https://issues.apache.org/jira/browse/BEAM-12229
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.29.0
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: P1
> Fix For: 2.30.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> After upgrading to 2.29, I noticed that our jobs have a 0% state cache hit 
> rate.  I see a very high eviction rate from the cache as well (it used to be 
> ~0, now its ~100,000+ evictions / second).
> We never were on 2.28, so I can't say if it worked there, but it did work on 
> 2.27.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12209) DirectStreamObserver is not thread-safe as advertised due to racy integer operations

2021-04-22 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12209:
--

 Summary: DirectStreamObserver is not thread-safe as advertised due 
to racy integer operations
 Key: BEAM-12209
 URL: https://issues.apache.org/jira/browse/BEAM-12209
 Project: Beam
  Issue Type: Bug
  Components: java-fn-execution
Reporter: Sam Whittle
Assignee: Sam Whittle


An AtomicInteger should be used instead for the message counting for periodic 
flow control checking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-12203) Reduce thread context switches in BeamFnControlClient

2021-04-21 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326662#comment-17326662
 ] 

Sam Whittle edited comment on BEAM-12203 at 4/21/21, 8:15 PM:
--

Realized that this could further be simplified by just instructing grpc to use 
the desired executor and we can execute requests inline to the observer.

The previous path involved several thread switches:
grpc executor -> direct observer which queued -> thread reading queue, queues 
on executor -> executor runs request
[edited removing previous comment about buffered observer which was incorrect 
as that applies to output]


was (Author: scwhittle):
Realized that this could further be simplified by just instructing grpc to use 
the desired executor and we can execute requests inline to the observer.

The previous path involved several thread switches:
grpc executor -> direct observer which queued -> thread reading queue, queues 
on executor -> executor runs request
or even worse if the buffering observer was enabled
grpc executor -> buffering observer queues -> buffering observer pulling thread 
pulls from buffer queues on unbounded buffer -> thread reads queue, queues on 
executor -> executor runs request

> Reduce thread context switches in BeamFnControlClient
> -
>
> Key: BEAM-12203
> URL: https://issues.apache.org/jira/browse/BEAM-12203
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> This class has a grpc observer that queues recieved messages on a 
> LinkedBlockingQueue and then has a separate thread pulling from the the queue 
> to add to an executor.
> Instead we could just add directly to the executor from the grpc stream, as 
> the executor by default has an unbounded queue size (and there are comments 
> indicating that a limited size executor can cause deadlocks otherwise)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12203) Reduce thread context switches in BeamFnControlClient

2021-04-21 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326662#comment-17326662
 ] 

Sam Whittle commented on BEAM-12203:


Realized that this could further be simplified by just instructing grpc to use 
the desired executor and we can execute requests inline to the observer.

The previous path involved several thread switches:
grpc executor -> direct observer which queued -> thread reading queue, queues 
on executor -> executor runs request
or even worse if the buffering observer was enabled
grpc executor -> buffering observer queues -> buffering observer pulling thread 
pulls from buffer queues on unbounded buffer -> thread reads queue, queues on 
executor -> executor runs request

> Reduce thread context switches in BeamFnControlClient
> -
>
> Key: BEAM-12203
> URL: https://issues.apache.org/jira/browse/BEAM-12203
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> This class has a grpc observer that queues recieved messages on a 
> LinkedBlockingQueue and then has a separate thread pulling from the the queue 
> to add to an executor.
> Instead we could just add directly to the executor from the grpc stream, as 
> the executor by default has an unbounded queue size (and there are comments 
> indicating that a limited size executor can cause deadlocks otherwise)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12203) Reduce thread context switches in BeamFnControlClient

2021-04-21 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12203:
---
Summary: Reduce thread context switches in BeamFnControlClient  (was: 
Remove LinkedBlockingQueue from BeamFnControlClient)

> Reduce thread context switches in BeamFnControlClient
> -
>
> Key: BEAM-12203
> URL: https://issues.apache.org/jira/browse/BEAM-12203
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> This class has a grpc observer that queues recieved messages on a 
> LinkedBlockingQueue and then has a separate thread pulling from the the queue 
> to add to an executor.
> Instead we could just add directly to the executor from the grpc stream, as 
> the executor by default has an unbounded queue size (and there are comments 
> indicating that a limited size executor can cause deadlocks otherwise)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12203) Remove LinkedBlockingQueue from BeamFnControlClient

2021-04-21 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326606#comment-17326606
 ] 

Sam Whittle commented on BEAM-12203:


The put/take on this queue appears to be 2% of cpu on a benchmark.

> Remove LinkedBlockingQueue from BeamFnControlClient
> ---
>
> Key: BEAM-12203
> URL: https://issues.apache.org/jira/browse/BEAM-12203
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> This class has a grpc observer that queues recieved messages on a 
> LinkedBlockingQueue and then has a separate thread pulling from the the queue 
> to add to an executor.
> Instead we could just add directly to the executor from the grpc stream, as 
> the executor by default has an unbounded queue size (and there are comments 
> indicating that a limited size executor can cause deadlocks otherwise)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12203) Remove LinkedBlockingQueue from BeamFnControlClient

2021-04-21 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12203:
--

 Summary: Remove LinkedBlockingQueue from BeamFnControlClient
 Key: BEAM-12203
 URL: https://issues.apache.org/jira/browse/BEAM-12203
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-harness
Reporter: Sam Whittle
Assignee: Sam Whittle


This class has a grpc observer that queues recieved messages on a 
LinkedBlockingQueue and then has a separate thread pulling from the the queue 
to add to an executor.

Instead we could just add directly to the executor from the grpc stream, as the 
executor by default has an unbounded queue size (and there are comments 
indicating that a limited size executor can cause deadlocks otherwise)







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing

2021-04-21 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12118:
---
Fix Version/s: 2.30.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> QueuingBeamFnDataClient adds polling latency to completing bundle processing
> 
>
> Key: BEAM-12118
> URL: https://issues.apache.org/jira/browse/BEAM-12118
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.30.0
>
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Currently the inboundDataClients are registered with recieve, and they add 
> data to a queue. There is no explicit indication from the clients that they 
> are no longer going to add values to the queue.
> Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled 
> and if nothing is present all clients are polled to see if they are complete.
> This design makes for unfortunate tradeoffs on poll timeout:
> - cpu wasted with small timeout
> - additional latency in noticing we have completed with larger timeout
> With the existing InboundDataClient interface, we could have a separate 
> thread call awaitCompletion on all of the clients and then shutdown the queue 
> (adding a poison pill perhaps)
> Or we could modify InboundDataClient interface to allow registering iterest 
> in when the client is done producing elements.  The existing clients all seem 
> based upon futures which allow that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11910) Increase subsequent page size for bags after the first

2021-04-15 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11910:
---
Fix Version/s: 2.29.0

> Increase subsequent page size for bags after the first
> --
>
> Key: BEAM-11910
> URL: https://issues.apache.org/jira/browse/BEAM-11910
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Labels: stale-assigned
> Fix For: 2.29.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently the page size of bags requested from the streaming dataflow backend 
> is always 8MB.  In pipelines with large bags this can limit throughput as it 
> results in more round-trips to the backend.  In particular with Streaming 
> Engine this is noticable due to increased latency.
> I propose using 8MB for the first bag fetch and then doubling the limit for 
> subsequent paginations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12127) Reduce counter overhead in PCollectionConsumerRegistry.accept

2021-04-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12127:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Reduce counter overhead in PCollectionConsumerRegistry.accept
> -
>
> Key: BEAM-12127
> URL: https://issues.apache.org/jira/browse/BEAM-12127
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> DelegatingCounter.inc shows up as 21% of cpu on nexmark query 2 benchmark 
> under PCollectionConsumerRegistry.accept
> 2% is actual counter incrementing, but the majority is the delegation of 
> DelegatingCounter which involves looking up thread-local state and then 
> getting the counter for the name from the counter container.  However in this 
> case the counter container is known and  can just be bound when constructing 
> the counter instaed of using DelegatingCounter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12142) Reduce overhead of MetricsEnvironment

2021-04-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12142:
---
Fix Version/s: 2.30.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Reduce overhead of MetricsEnvironment
> -
>
> Key: BEAM-12142
> URL: https://issues.apache.org/jira/browse/BEAM-12142
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.30.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Currently the MetricsContainer objects are stored in ThreadLocal state. This 
> means that scoping a new container involves a get and a set of thread-local 
> state. By instead putting a wrapper object in the thread-local state we can 
> use a single-lookup in thread local state to get/set and then reset.
> This is showing up as a possible 7% cpu improvement in a nexmark query 
> benchmark
> Additionally I think that removing from the threadlocal state is causing 
> overhead in get calls by causing the linear probing within the implementation 
> of ThreadLocal state to become more expensive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12117) QueuingBeamFnDataClient inbound client set grows with BundleProcessor reuse

2021-04-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-12117:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> QueuingBeamFnDataClient inbound client set grows with BundleProcessor reuse
> ---
>
> Key: BEAM-12117
> URL: https://issues.apache.org/jira/browse/BEAM-12117
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> It should have a reset method and be reset when the BundleProcessor is reset.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11910) Increase subsequent page size for bags after the first

2021-04-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11910:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Increase subsequent page size for bags after the first
> --
>
> Key: BEAM-11910
> URL: https://issues.apache.org/jira/browse/BEAM-11910
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently the page size of bags requested from the streaming dataflow backend 
> is always 8MB.  In pipelines with large bags this can limit throughput as it 
> results in more round-trips to the backend.  In particular with Streaming 
> Engine this is noticable due to increased latency.
> I propose using 8MB for the first bag fetch and then doubling the limit for 
> subsequent paginations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine

2021-04-09 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12144:
--

 Summary: Dataflow streaming worker stuck and unable to get work 
from Streaming Engine
 Key: BEAM-12144
 URL: https://issues.apache.org/jira/browse/BEAM-12144
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Affects Versions: 2.26.0
Reporter: Sam Whittle
Assignee: Sam Whittle


Observed in 2.26 but seems like it could affect later versions as well, as 
previous issues addressing similar problems were before 2.26.  This seems 
similar to BEAM-9651 but not the deadlock observed there.

The thread getting work has the following stack:

--- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: ---
  java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method)
  
java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127)
  
java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
  
java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057)
  
java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747)
  
app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49)
  
app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662)
  
app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868)
  
app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677)
  
app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860)
  
app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843)
  
app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543)
  
app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047)
  
app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670)
  java.base@11.0.9/java.lang.Thread.run(Thread.java:834)

The status page shows:
GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight 
bytes allowed, current stream is 61355396ms old, last send 61355396ms, last 
response -1ms

Showing that the stream was created 17 hours ago, sent the header message but 
never received a response.  With the stack trace it appears that the header was 
never sent but the stream also didn't terminate with a deadline exceed.  This 
seems like a grpc issue to not get an error for the stream, however it would be 
safer to not block indefinitely on the Phaser waiting for the send and instead 
throw an exception after 2x the stream deadline for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12142) Reduce overhead of MetricsEnvironment

2021-04-09 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12142:
--

 Summary: Reduce overhead of MetricsEnvironment
 Key: BEAM-12142
 URL: https://issues.apache.org/jira/browse/BEAM-12142
 Project: Beam
  Issue Type: Bug
  Components: java-fn-execution
Reporter: Sam Whittle
Assignee: Sam Whittle


Currently the MetricsContainer objects are stored in ThreadLocal state. This 
means that scoping a new container involves a get and a set of thread-local 
state. By instead putting a wrapper object in the thread-local state we can use 
a single-lookup in thread local state to get/set and then reset.
This is showing up as a possible 7% cpu improvement in a nexmark query benchmark

Additionally I think that removing from the threadlocal state is causing 
overhead in get calls by causing the linear probing within the implementation 
of ThreadLocal state to become more expensive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12127) Reduce counter overhead in PCollectionConsumerRegistry.accept

2021-04-08 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12127:
--

 Summary: Reduce counter overhead in 
PCollectionConsumerRegistry.accept
 Key: BEAM-12127
 URL: https://issues.apache.org/jira/browse/BEAM-12127
 Project: Beam
  Issue Type: Bug
  Components: java-fn-execution
Reporter: Sam Whittle
Assignee: Sam Whittle


DelegatingCounter.inc shows up as 21% of cpu on nexmark query 2 benchmark under 
PCollectionConsumerRegistry.accept

2% is actual counter incrementing, but the majority is the delegation of 
DelegatingCounter which involves looking up thread-local state and then getting 
the counter for the name from the counter container.  However in this case the 
counter container is known and  can just be bound when constructing the counter 
instaed of using DelegatingCounter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing

2021-04-07 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12118:
--

 Summary: QueuingBeamFnDataClient adds polling latency to 
completing bundle processing
 Key: BEAM-12118
 URL: https://issues.apache.org/jira/browse/BEAM-12118
 Project: Beam
  Issue Type: Bug
  Components: java-fn-execution
Reporter: Sam Whittle
Assignee: Sam Whittle


Currently the inboundDataClients are registered with recieve, and they add data 
to a queue. There is no explicit indication from the clients that they are no 
longer going to add values to the queue.

Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled and 
if nothing is present all clients are polled to see if they are complete.

This design makes for unfortunate tradeoffs on poll timeout:
- cpu wasted with small timeout
- additional latency in noticing we have completed with larger timeout

With the existing InboundDataClient interface, we could have a separate thread 
call awaitCompletion on all of the clients and then shutdown the queue (adding 
a poison pill perhaps)
Or we could modify InboundDataClient interface to allow registering iterest in 
when the client is done producing elements.  The existing clients all seem 
based upon futures which allow that.








--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12117) QueuingBeamFnDataClient inbound client set grows with BundleProcessor reuse

2021-04-07 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-12117:
--

 Summary: QueuingBeamFnDataClient inbound client set grows with 
BundleProcessor reuse
 Key: BEAM-12117
 URL: https://issues.apache.org/jira/browse/BEAM-12117
 Project: Beam
  Issue Type: Bug
  Components: java-fn-execution
Reporter: Sam Whittle
Assignee: Sam Whittle


It should have a reset method and be reset when the BundleProcessor is reset.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11727) Optimize ExecutionStateSampler

2021-03-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11727:
---
Resolution: Fixed
Status: Resolved  (was: Triage Needed)

> Optimize ExecutionStateSampler
> --
>
> Key: BEAM-11727
> URL: https://issues.apache.org/jira/browse/BEAM-11727
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run.
> It appears to be using a ConcurrentSkipListSet and most of the cpu is 
> relatedd to inserts/removes in that, involving the system hash of the entries 
> as that is used for ordering.
> The consistent ordering is unnecessary. Additionally for other reasons, 
> removal and iteration is already synchronized and so performance will likely 
> be better just using a synchronized HashMap and synchronizing in the add case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11730) Reduce context switches for dataflow streaming appliance getdata reads

2021-03-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11730:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Reduce context switches for dataflow streaming appliance getdata reads
> --
>
> Key: BEAM-11730
> URL: https://issues.apache.org/jira/browse/BEAM-11730
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Currently reads are batched and issued from a separate thread pool of reading 
> threads.  However in the case that there is not read queuing due to the max # 
> of parallel requests, switching threads is unnecessary and adds additional 
> context swithing overhead.
> In a benchmark I observed 5% cpu spent on LockSupport.park which some 
> googling indicates is due to context switching.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11707) Optimize WindmillStateCache CPU usage

2021-03-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11707:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Optimize WindmillStateCache CPU usage
> -
>
> Key: BEAM-11707
> URL: https://issues.apache.org/jira/browse/BEAM-11707
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> From profiling nexmark Query11 which has many unique tags per key, I observed 
> that the WindmillStateCache cpu usage was 6% of CPU.
> The usage appears to be due to the invalidation set maintenance as well as 
> many reads/inserts.
> The invalidation set is maintained so that if a key encounters an error 
> processing or the cache token changes, we can invalidate all the entries for 
> a key.  Currently this is done by removing all entries for the key from the 
> cache.  Another alternative which appears much more CPU efficient is to 
> instead leave the entries in the cache but make them unreachable.  This can 
> be done by having a per-key object that uses object equality as part of the 
> cache lookup.  Then to discard entries for the key, we start using a new 
> per-key object.  Cleanup of per-key objects can be done with a weak reference 
> map.
> Another cost to the cache is that objects are grouped by window so that they 
> are kept/evicted all at once.  However currently when reading items into the 
> cache, we fetch the window set and then lookup each tag in it.  This could be 
> cached for the key to avoid multiple cache lookups. Similarly for putting 
> objects we lookup and insert each tag separately and then update the cache to 
> update the weight for the per-window set.  This could be done once after all 
> updates for the window have been made.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11729) Remove ReduceFnRunner eager class name evaluation for debug logging

2021-03-12 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11729:
---
Resolution: Fixed
Status: Resolved  (was: Open)

> Remove ReduceFnRunner eager class name evaluation for debug logging
> ---
>
> Key: BEAM-11729
> URL: https://issues.apache.org/jira/browse/BEAM-11729
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Shows up on profiles, forget what it was but was high enough I saw it and it 
> it is for window tracing.  We should just use a constant for the class name 
> instead of calling SimpleClassName



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-11910) Increase subsequent page size for bags after the first

2021-03-02 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-11910:
--

Assignee: Sam Whittle

> Increase subsequent page size for bags after the first
> --
>
> Key: BEAM-11910
> URL: https://issues.apache.org/jira/browse/BEAM-11910
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Currently the page size of bags requested from the streaming dataflow backend 
> is always 8MB.  In pipelines with large bags this can limit throughput as it 
> results in more round-trips to the backend.  In particular with Streaming 
> Engine this is noticable due to increased latency.
> I propose using 8MB for the first bag fetch and then doubling the limit for 
> subsequent paginations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11910) Increase subsequent page size for bags after the first

2021-03-02 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-11910:
--

 Summary: Increase subsequent page size for bags after the first
 Key: BEAM-11910
 URL: https://issues.apache.org/jira/browse/BEAM-11910
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Sam Whittle


Currently the page size of bags requested from the streaming dataflow backend 
is always 8MB.  In pipelines with large bags this can limit throughput as it 
results in more round-trips to the backend.  In particular with Streaming 
Engine this is noticable due to increased latency.

I propose using 8MB for the first bag fetch and then doubling the limit for 
subsequent paginations



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11144) TriggerStateMachine.prefetchOnElement and other prefetch methods use incorrect state for subtriggers

2021-02-23 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17289572#comment-17289572
 ] 

Sam Whittle commented on BEAM-11144:


Yes, thanks!




> TriggerStateMachine.prefetchOnElement and other prefetch methods use 
> incorrect state for subtriggers
> 
>
> Key: BEAM-11144
> URL: https://issues.apache.org/jira/browse/BEAM-11144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.26.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> prefetchOnElement takes a single StateAccessor corresponding to the root 
> trigger.  It uses that state accessor to perform prefetches for the 
> subtrigger state. However that will use the root trigger namespace for the 
> tags in the subtriggers.  This makes prefetching ineffective, introducing 
> possibly an additional round-trip once the data is actually read.
> https://github.com/apache/beam/blob/68d6c8e6243b1d8f392840273f886276e2a8baff/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java#L296



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11730) Reduce context switches for dataflow streaming appliance getdata reads

2021-02-01 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-11730:
--

 Summary: Reduce context switches for dataflow streaming appliance 
getdata reads
 Key: BEAM-11730
 URL: https://issues.apache.org/jira/browse/BEAM-11730
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Sam Whittle
Assignee: Sam Whittle


Currently reads are batched and issued from a separate thread pool of reading 
threads.  However in the case that there is not read queuing due to the max # 
of parallel requests, switching threads is unnecessary and adds additional 
context swithing overhead.

In a benchmark I observed 5% cpu spent on LockSupport.park which some googling 
indicates is due to context switching.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11729) Remove ReduceFnRunner eager class name evaluation for debug logging

2021-02-01 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-11729:
--

 Summary: Remove ReduceFnRunner eager class name evaluation for 
debug logging
 Key: BEAM-11729
 URL: https://issues.apache.org/jira/browse/BEAM-11729
 Project: Beam
  Issue Type: Bug
  Components: runner-core
Reporter: Sam Whittle
Assignee: Sam Whittle


Shows up on profiles, forget what it was but was high enough I saw it and it it 
is for window tracing.  We should just use a constant for the class name 
instead of calling SimpleClassName



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11706) TriggerProto translation shows up as 1% cpu on some benchmarks

2021-02-01 Thread Sam Whittle (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276313#comment-17276313
 ] 

Sam Whittle commented on BEAM-11706:


I had a typo in the jira issue for the pull request. The fix has been submitted 
as
https://github.com/apache/beam/pull/13831

> TriggerProto translation shows up as 1% cpu on some benchmarks
> --
>
> Key: BEAM-11706
> URL: https://issues.apache.org/jira/browse/BEAM-11706
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
> Fix For: 2.29.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11727) Optimize ExecutionStateSampler

2021-02-01 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11727:
---
Component/s: (was: runner-dataflow)
 runner-core

> Optimize ExecutionStateSampler
> --
>
> Key: BEAM-11727
> URL: https://issues.apache.org/jira/browse/BEAM-11727
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run.
> It appears to be using a ConcurrentSkipListSet and most of the cpu is 
> relatedd to inserts/removes in that, involving the system hash of the entries 
> as that is used for ordering.
> The consistent ordering is unnecessary. Additionally for other reasons, 
> removal and iteration is already synchronized and so performance will likely 
> be better just using a synchronized HashMap and synchronizing in the add case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11727) Optimize ExecutionStateSampler

2021-02-01 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle updated BEAM-11727:
---
Summary: Optimize ExecutionStateSampler  (was: Optimize 
ExecutionStateTracker)

> Optimize ExecutionStateSampler
> --
>
> Key: BEAM-11727
> URL: https://issues.apache.org/jira/browse/BEAM-11727
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run.
> It appears to be using a ConcurrentSkipListSet and most of the cpu is 
> relatedd to inserts/removes in that, involving the system hash of the entries 
> as that is used for ordering.
> The consistent ordering is unnecessary. Additionally for other reasons, 
> removal and iteration is already synchronized and so performance will likely 
> be better just using a synchronized HashMap and synchronizing in the add case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-11727) Optimize ExecutionStateTracker

2021-02-01 Thread Sam Whittle (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Whittle reassigned BEAM-11727:
--

Assignee: Sam Whittle

> Optimize ExecutionStateTracker
> --
>
> Key: BEAM-11727
> URL: https://issues.apache.org/jira/browse/BEAM-11727
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: P2
>
> Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run.
> It appears to be using a ConcurrentSkipListSet and most of the cpu is 
> relatedd to inserts/removes in that, involving the system hash of the entries 
> as that is used for ordering.
> The consistent ordering is unnecessary. Additionally for other reasons, 
> removal and iteration is already synchronized and so performance will likely 
> be better just using a synchronized HashMap and synchronizing in the add case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11727) Optimize ExecutionStateTracker

2021-02-01 Thread Sam Whittle (Jira)
Sam Whittle created BEAM-11727:
--

 Summary: Optimize ExecutionStateTracker
 Key: BEAM-11727
 URL: https://issues.apache.org/jira/browse/BEAM-11727
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Sam Whittle


Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run.

It appears to be using a ConcurrentSkipListSet and most of the cpu is relatedd 
to inserts/removes in that, involving the system hash of the entries as that is 
used for ordering.

The consistent ordering is unnecessary. Additionally for other reasons, removal 
and iteration is already synchronized and so performance will likely be better 
just using a synchronized HashMap and synchronizing in the add case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >