[jira] [Commented] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages
[ https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17544772#comment-17544772 ] Sam Whittle commented on BEAM-12942: Java sdk was fixed with 2.34, reopening to add similar validation to python and go sdks > Dataflow runner specialization of PubsubIO should validate messages > --- > > Key: BEAM-12942 > URL: https://issues.apache.org/jira/browse/BEAM-12942 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.34.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently invalid messages, such as those with attributes exceeding the > maximum size are processed by the bundle successfully but fail to commit. > Throwing an exception when trying to write such a message directly would > increase visibility as well as allowing users to catch and handle such > exceptions. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Reopened] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages
[ https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reopened BEAM-12942: > Dataflow runner specialization of PubsubIO should validate messages > --- > > Key: BEAM-12942 > URL: https://issues.apache.org/jira/browse/BEAM-12942 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.34.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently invalid messages, such as those with attributes exceeding the > maximum size are processed by the bundle successfully but fail to commit. > Throwing an exception when trying to write such a message directly would > increase visibility as well as allowing users to catch and handle such > exceptions. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"
[ https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-14167: -- Assignee: Kiley Sok (was: Luke Cwik) Feel free to mark as duplicate or blocker of BEAM-13695 > Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may > be underestimated" > --- > > Key: BEAM-14167 > URL: https://issues.apache.org/jira/browse/BEAM-14167 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Sam Whittle >Assignee: Kiley Sok >Priority: P2 > Labels: Java11 > > Can we fix these somehow? At a minimum we should catch them when weighing and > not cache the state. > "java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129) > at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64) > at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202) > at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355) > at > org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323) > at > org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) > at > org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at > org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246) > at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20) > at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228) > at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210) > at java.base/java.lang.ClassValue.get(ClassValue.java:116) > at > org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219) > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119) > ... 18 more > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make > field private final byte[] java.lang.String.value accessible: module > java.base does not "opens java.lang" to unnamed module @39529185 > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) > at > java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178) > at java.base/java.lang.reflect.Field.setAccessible(Field.java:172) > at > org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241) > ... 26 more > " -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"
[ https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-14167: --- Labels: Java11 (was: ) > Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may > be underestimated" > --- > > Key: BEAM-14167 > URL: https://issues.apache.org/jira/browse/BEAM-14167 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Sam Whittle >Assignee: Luke Cwik >Priority: P2 > Labels: Java11 > > Can we fix these somehow? At a minimum we should catch them when weighing and > not cache the state. > "java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129) > at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64) > at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202) > at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355) > at > org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323) > at > org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) > at > org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at > org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246) > at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20) > at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228) > at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210) > at java.base/java.lang.ClassValue.get(ClassValue.java:116) > at > org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219) > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119) > ... 18 more > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make > field private final byte[] java.lang.String.value accessible: module > java.base does not "opens java.lang" to unnamed module @39529185 > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) > at > java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178) > at java.base/java.lang.reflect.Field.setAccessible(Field.java:172) > at > org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241) > ... 26 more > " -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"
[ https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511869#comment-17511869 ] Sam Whittle edited comment on BEAM-14167 at 3/24/22, 1:59 PM: -- >From >https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > it appears that this was introduced in Java 9, so that could explain why I >haven't seen it before since I normally run Java 8 pipelines. There is also a potential fix there by adding flags to the JVM. was (Author: scwhittle): >From >https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > it appears that this was introduced in Java 9, so that could explain why I >haven't seen it before since I normally run Java 8 pipelines. > Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may > be underestimated" > --- > > Key: BEAM-14167 > URL: https://issues.apache.org/jira/browse/BEAM-14167 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Sam Whittle >Assignee: Luke Cwik >Priority: P2 > > Can we fix these somehow? At a minimum we should catch them when weighing and > not cache the state. > "java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129) > at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64) > at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202) > at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355) > at > org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323) > at > org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) > at > org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at > org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246) > at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20) > at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228) > at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210) > at java.base/java.lang.ClassValue.get(ClassValue.java:116) > at > org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219) > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119) > ... 18 more > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make > field private final byte[] java.lang.String.value accessible: module > java.base does not "opens java.lang" to unnamed module @39529185 > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) > at > java.base/java.lang.reflect.AccessibleObject.checkCanS
[jira] [Commented] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"
[ https://issues.apache.org/jira/browse/BEAM-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511869#comment-17511869 ] Sam Whittle commented on BEAM-14167: >From >https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > it appears that this was introduced in Java 9, so that could explain why I >haven't seen it before since I normally run Java 8 pipelines. > Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may > be underestimated" > --- > > Key: BEAM-14167 > URL: https://issues.apache.org/jira/browse/BEAM-14167 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Sam Whittle >Assignee: Luke Cwik >Priority: P2 > > Can we fix these somehow? At a minimum we should catch them when weighing and > not cache the state. > "java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129) > at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64) > at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202) > at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363) > at > org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355) > at > org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323) > at > org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259) > at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) > at > org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.RuntimeException: > java.lang.reflect.InaccessibleObjectException: Unable to make field private > final byte[] java.lang.String.value accessible: module java.base does not > "opens java.lang" to unnamed module @39529185 > at > org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246) > at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24) > at > org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20) > at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228) > at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210) > at java.base/java.lang.ClassValue.get(ClassValue.java:116) > at > org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219) > at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119) > ... 18 more > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make > field private final byte[] java.lang.String.value accessible: module > java.base does not "opens java.lang" to unnamed module @39529185 > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) > at > java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178) > at java.base/java.lang.reflect.Field.setAccessible(Field.java:172) > at > org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241) > ... 26 more > " -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (BEAM-14167) Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated"
Sam Whittle created BEAM-14167: -- Summary: Jamm exceptions "JVM prevents jamm from accessing subgraph - cache sizes may be underestimated" Key: BEAM-14167 URL: https://issues.apache.org/jira/browse/BEAM-14167 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Sam Whittle Assignee: Luke Cwik Can we fix these somehow? At a minimum we should catch them when weighing and not cache the state. "java.lang.RuntimeException: java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @39529185 at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:129) at org.apache.beam.fn.harness.Caches.weigh(Caches.java:64) at org.apache.beam.fn.harness.Caches.findWeight(Caches.java:202) at org.apache.beam.fn.harness.Caches.access$600(Caches.java:46) at org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:363) at org.apache.beam.fn.harness.Caches$CompositeKey.(Caches.java:355) at org.apache.beam.fn.harness.Caches$CompositeKeyPrefix.valueKey(Caches.java:323) at org.apache.beam.fn.harness.Caches$SubCache.computeIfAbsent(Caches.java:259) at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:259) at org.apache.beam.fn.harness.FnHarness$1.apply(FnHarness.java:252) at org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:700) at org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:490) at org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:895) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:486) at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @39529185 at org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:246) at org.github.jamm.MemoryMeterBase.access$000(MemoryMeterBase.java:15) at org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:24) at org.github.jamm.MemoryMeterBase$1.computeValue(MemoryMeterBase.java:20) at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228) at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210) at java.base/java.lang.ClassValue.get(ClassValue.java:116) at org.github.jamm.MemoryMeterBase.declaredClassFields(MemoryMeterBase.java:219) at org.github.jamm.MemoryMeterBase.measureDeep(MemoryMeterBase.java:119) ... 18 more Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @39529185 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178) at java.base/java.lang.reflect.Field.setAccessible(Field.java:172) at org.github.jamm.MemoryMeterBase.declaredClassFields0(MemoryMeterBase.java:241) ... 26 more " -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (BEAM-13826) Reduce number of Gax related threads, likely by providing common executor to GAX clients
Sam Whittle created BEAM-13826: -- Summary: Reduce number of Gax related threads, likely by providing common executor to GAX clients Key: BEAM-13826 URL: https://issues.apache.org/jira/browse/BEAM-13826 Project: Beam Issue Type: Improvement Components: io-java-gcp Reporter: Sam Whittle When looking at thread stacks for a pipeline reading pubsub and writing to BQ using the WriteApi there were 3600 Gax- prefixed threads. The # of threads an possibly performance could be improved by finding where these came from and using a shared executor. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (BEAM-13684) Consider adding information to ProcessBundleRequest indicating there is no existing state
Sam Whittle created BEAM-13684: -- Summary: Consider adding information to ProcessBundleRequest indicating there is no existing state Key: BEAM-13684 URL: https://issues.apache.org/jira/browse/BEAM-13684 Project: Beam Issue Type: Improvement Components: beam-model Reporter: Sam Whittle This allows for skipping all state fetches made during processing this key, preloading the cache with empty values for the key, or extending the cache to include this negative state information to avoid fetches as long as no state is evicted for the key. This is particularly beneficial in sparse key streaming pipelines where such new keys are common. It could also be used to avoid fetches in batch pipelines where there is no persisted state. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (BEAM-12857) Unable to write to GCS due to IndexOutOfBoundsException in FileSystems
[ https://issues.apache.org/jira/browse/BEAM-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477130#comment-17477130 ] Sam Whittle edited comment on BEAM-12857 at 1/17/22, 11:18 AM: --- >From looking at code it does seem that that such an exception could be >encountered with the following: ignoreMissingSrc was false skipExistingDest was true matchSrcResults.get(i).status() was NOT_FOUND and metadata was empty/null matchDestResults.get(i).status().equals(Status.OK) This would trigger if the src was missing and the destination already existed. This could happen if the file rename stage ran multiple times due to a retry. https://github.com/apache/beam/pull/15301 changed so that the source files are only matched if ignoreMissingSrc was true, so this but itself woudn't occur around the metadata. However there is a similar bug still present if ignoreMissingSrc was false and skipExistingDest is true since that examines the src results without populating them. The FileBasedSink specifies both of these so it is likely upgrading the sdk will fix your issue, though the bug should still be fixed. was (Author: scwhittle): >From looking at code it does seem that that such an exception could be >encountered with the following: ignoreMissingSrc was false skipExistingDest was true matchSrcResults.get(i).status() was NOT_FOUND and metadata was empty/null matchDestResults.get(i).status().equals(Status.OK) This would trigger if the src was missing and the destination already existed. This could happen if the file rename stage ran multiple times due to a retry. https://github.com/apache/beam/pull/15301 changed so that the source files are only matched if ignoreMissingSrc was true, so this but itself woudn't occur around the metadata. However there is a similar bug still present if ignoreMissingSrc was false and skipExistingDest is true since that examines the src results without populating them. > Unable to write to GCS due to IndexOutOfBoundsException in FileSystems > -- > > Key: BEAM-12857 > URL: https://issues.apache.org/jira/browse/BEAM-12857 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.31.0, 2.32.0 > Environment: Beam 2.31.0/2.32.0, Java 11, GCP Dataflow >Reporter: Patrick Lucas >Priority: P2 > > I have a simple batch job, running on Dataflow, that reads from a GCS bucket, > filters the data, and windows and writes the matching data back to a > different path in the same bucket. > The job seems to succeed in reading and filtering the data, as well as > writing temporary files to GCS, but appears to fail when trying to rename the > temporary files to their final destination. > The IndexOutOfBoundsException is thrown from > [FileSystems.java:429|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L429] > (in 2.32.0), when the code calls {{.get(0)}} on the list returned by a call > to {{MatchResult#metadata()}}. > The javadoc for > [{{MatchResult#metadata()}}|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java#L75-L80] > says, > {code:java} > /** >* {@link Metadata} of matched files. Note that if {@link #status()} is > {@link Status#NOT_FOUND}, >* this may either throw a {@link java.io.FileNotFoundException} or return > an empty list, >* depending on the {@link EmptyMatchTreatment} used in the {@link > FileSystems#match} call. >*/ > {code} > So possibly GCS is not returning any metadata for the (missing) destination > object? That seems unlikely, as I would expect many others would have already > run into this, but I don't see how this could be caused by my user code. > I have tested this on 2.31.0 and 2.32.0 getting the same error, but it's > worth noting that the logic in FileSystems.java changed a decent amount > recently in [#15301|https://github.com/apache/beam/pull/15301], maybe having > an effect on this, but I haven't been able to test it since I'm working in a > closed environment and can only easily use released versions of Beam. Once a > version containing this change is released, I will upgrade and try again. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (BEAM-12857) Unable to write to GCS due to IndexOutOfBoundsException in FileSystems
[ https://issues.apache.org/jira/browse/BEAM-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477130#comment-17477130 ] Sam Whittle commented on BEAM-12857: >From looking at code it does seem that that such an exception could be >encountered with the following: ignoreMissingSrc was false skipExistingDest was true matchSrcResults.get(i).status() was NOT_FOUND and metadata was empty/null matchDestResults.get(i).status().equals(Status.OK) This would trigger if the src was missing and the destination already existed. This could happen if the file rename stage ran multiple times due to a retry. https://github.com/apache/beam/pull/15301 changed so that the source files are only matched if ignoreMissingSrc was true, so this but itself woudn't occur around the metadata. However there is a similar bug still present if ignoreMissingSrc was false and skipExistingDest is true since that examines the src results without populating them. > Unable to write to GCS due to IndexOutOfBoundsException in FileSystems > -- > > Key: BEAM-12857 > URL: https://issues.apache.org/jira/browse/BEAM-12857 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.31.0, 2.32.0 > Environment: Beam 2.31.0/2.32.0, Java 11, GCP Dataflow >Reporter: Patrick Lucas >Priority: P2 > > I have a simple batch job, running on Dataflow, that reads from a GCS bucket, > filters the data, and windows and writes the matching data back to a > different path in the same bucket. > The job seems to succeed in reading and filtering the data, as well as > writing temporary files to GCS, but appears to fail when trying to rename the > temporary files to their final destination. > The IndexOutOfBoundsException is thrown from > [FileSystems.java:429|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L429] > (in 2.32.0), when the code calls {{.get(0)}} on the list returned by a call > to {{MatchResult#metadata()}}. > The javadoc for > [{{MatchResult#metadata()}}|https://github.com/apache/beam/blob/v2.32.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java#L75-L80] > says, > {code:java} > /** >* {@link Metadata} of matched files. Note that if {@link #status()} is > {@link Status#NOT_FOUND}, >* this may either throw a {@link java.io.FileNotFoundException} or return > an empty list, >* depending on the {@link EmptyMatchTreatment} used in the {@link > FileSystems#match} call. >*/ > {code} > So possibly GCS is not returning any metadata for the (missing) destination > object? That seems unlikely, as I would expect many others would have already > run into this, but I don't see how this could be caused by my user code. > I have tested this on 2.31.0 and 2.32.0 getting the same error, but it's > worth noting that the logic in FileSystems.java changed a decent amount > recently in [#15301|https://github.com/apache/beam/pull/15301], maybe having > an effect on this, but I haven't been able to test it since I'm working in a > closed environment and can only easily use released versions of Beam. Once a > version containing this change is released, I will upgrade and try again. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (BEAM-12776) Improve parallelism of closing files in FileIO
[ https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reopened BEAM-12776: There is a large buffer by default for GCS writes and closing all windows in parallel can increase memory usage and trigger OOMs. I plan on changing this to limit parallelism to a certain amount, and allow that parallelism to be controlled by an option. > Improve parallelism of closing files in FileIO > -- > > Key: BEAM-12776 > URL: https://issues.apache.org/jira/browse/BEAM-12776 > Project: Beam > Issue Type: Bug > Components: io-java-files >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 2h > Remaining Estimate: 0h > > Currently close happens in processElement which is per-window. > If there are many windows firing this can throttle throughput waiting for IO > instead of closing in parallel in finishBundle. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-12776) Improve parallelism of closing files in FileIO
[ https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12776: --- Status: Open (was: Triage Needed) > Improve parallelism of closing files in FileIO > -- > > Key: BEAM-12776 > URL: https://issues.apache.org/jira/browse/BEAM-12776 > Project: Beam > Issue Type: Bug > Components: io-java-files >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 2h > Remaining Estimate: 0h > > Currently close happens in processElement which is per-window. > If there are many windows firing this can throttle throughput waiting for IO > instead of closing in parallel in finishBundle. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Work started] (BEAM-12776) Improve parallelism of closing files in FileIO
[ https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-12776 started by Sam Whittle. -- > Improve parallelism of closing files in FileIO > -- > > Key: BEAM-12776 > URL: https://issues.apache.org/jira/browse/BEAM-12776 > Project: Beam > Issue Type: Bug > Components: io-java-files >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 2h > Remaining Estimate: 0h > > Currently close happens in processElement which is per-window. > If there are many windows firing this can throttle throughput waiting for IO > instead of closing in parallel in finishBundle. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine
[ https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12144: --- Fix Version/s: 2.32.0 (was: 2.31.0) > Dataflow streaming worker stuck and unable to get work from Streaming Engine > > > Key: BEAM-12144 > URL: https://issues.apache.org/jira/browse/BEAM-12144 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.26.0 >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.32.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Observed in 2.26 but seems like it could affect later versions as well, as > previous issues addressing similar problems were before 2.26. This seems > similar to BEAM-9651 but not the deadlock observed there. > The thread getting work has the following stack: > --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: --- > java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method) > > java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) > java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127) > > java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128) > > java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057) > > java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747) > > app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670) > java.base@11.0.9/java.lang.Thread.run(Thread.java:834) > The status page shows: > GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight > bytes allowed, current stream is 61355396ms old, last send 61355396ms, last > response -1ms > Showing that the stream was created 17 hours ago, sent the header message but > never received a response. With the stack trace it appears that the header > was never sent but the stream also didn't terminate with a deadline exceed. > This seems like a grpc issue to not get an error for the stream, however it > would be safer to not block indefinitely on the Phaser waiting for the send > and instead throw an exception after 2x the stream deadline for example. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit
[ https://issues.apache.org/jira/browse/BEAM-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-13268: --- Fix Version/s: 2.35.0 Resolution: Fixed Status: Resolved (was: Triage Needed) > Reduce latency by parallelizing BQ inserts when flushing due to row limit > - > > Key: BEAM-13268 > URL: https://issues.apache.org/jira/browse/BEAM-13268 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.35.0 > > Time Spent: 1h > Remaining Estimate: 0h > > InsertBatchedElements consumes the output of GroupIntoBatches which flushes > after default 500 elements to respect the max items per streaming insert > request to BQ. > However InsertBatchedElements flushes rows synchrounously meaning that > latencies of writes accumulates. It could instead initiate writes in > ProcessElement and block on write completion in FinishBundle. There could be > some limited configurable parallelism if desired to limit memory usage. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-12818) When writing to GCS, spread prefix of temporary files and reuse autoscaling of the temporary directory
[ https://issues.apache.org/jira/browse/BEAM-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12818: --- Resolution: Fixed Status: Resolved (was: Open) > When writing to GCS, spread prefix of temporary files and reuse autoscaling > of the temporary directory > -- > > Key: BEAM-12818 > URL: https://issues.apache.org/jira/browse/BEAM-12818 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P1 > Fix For: 2.35.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > When writing files using FileIO, the given temporary directory has a > subdirectory created in it for each FileBasedSink. This is useful for > non-windowed output where the temporary directory can be matched to delete > leftover files that were lost during processing. > However for windowed writes such subdirectories are unnecessary and cause a > common prefix to be shared for the temporary files. Additionally this common > prefix varies per job and thus the autoscaling for the previous prefix is no > longer effective, see > https://cloud.google.com/storage/docs/request-rate#randomness_after_sequential_prefixes_is_not_as_effective -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit
[ https://issues.apache.org/jira/browse/BEAM-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17445154#comment-17445154 ] Sam Whittle commented on BEAM-13268: Actually it looks like Pablo improved the BQ services to issue parallel rpcs for larger batches. So I think we can just ungroup the grouped input and rely on that parallelization. That also cleans up some code duplication. > Reduce latency by parallelizing BQ inserts when flushing due to row limit > - > > Key: BEAM-13268 > URL: https://issues.apache.org/jira/browse/BEAM-13268 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > InsertBatchedElements consumes the output of GroupIntoBatches which flushes > after default 500 elements to respect the max items per streaming insert > request to BQ. > However InsertBatchedElements flushes rows synchrounously meaning that > latencies of writes accumulates. It could instead initiate writes in > ProcessElement and block on write completion in FinishBundle. There could be > some limited configurable parallelism if desired to limit memory usage. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements
[ https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12472: --- Fix Version/s: 2.35.0 (was: 2.34.0) > BigQuery streaming writes can be batched beyond request limit with > BatchAndInsertElements > - > > Key: BEAM-12472 > URL: https://issues.apache.org/jira/browse/BEAM-12472 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Pablo Estrada >Priority: P2 > Labels: stale-P2 > Fix For: 2.35.0 > > > BatchAndInsertElements accumulates all the input elements and flushes them in > finishBundle. > However if there is enough data the request limit for bigquery can be > exceeded causing an exception like the following. It seems that finishBundle > should limit the # of rows and bytes and possibly flush multiple times for a > destination. > Work around would be to use autosharding which uses state that has batching > limits or to increase the # of streaming keys to decrease the likelihood of > hitting this. > {code} > Error while processing a work item: UNKNOWN: > org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > POST > https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "reason" : "badRequest" > } ], > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "status" : "INVALID_ARGUMENT" > } > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements
[ https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12472: --- Fix Version/s: 2.34.0 Resolution: Fixed Status: Resolved (was: Open) > BigQuery streaming writes can be batched beyond request limit with > BatchAndInsertElements > - > > Key: BEAM-12472 > URL: https://issues.apache.org/jira/browse/BEAM-12472 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Pablo Estrada >Priority: P2 > Labels: stale-P2 > Fix For: 2.34.0 > > > BatchAndInsertElements accumulates all the input elements and flushes them in > finishBundle. > However if there is enough data the request limit for bigquery can be > exceeded causing an exception like the following. It seems that finishBundle > should limit the # of rows and bytes and possibly flush multiple times for a > destination. > Work around would be to use autosharding which uses state that has batching > limits or to increase the # of streaming keys to decrease the likelihood of > hitting this. > {code} > Error while processing a work item: UNKNOWN: > org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > POST > https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "reason" : "badRequest" > } ], > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "status" : "INVALID_ARGUMENT" > } > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements
[ https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-12472: -- Assignee: Pablo Estrada > BigQuery streaming writes can be batched beyond request limit with > BatchAndInsertElements > - > > Key: BEAM-12472 > URL: https://issues.apache.org/jira/browse/BEAM-12472 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Pablo Estrada >Priority: P2 > Labels: stale-P2 > > BatchAndInsertElements accumulates all the input elements and flushes them in > finishBundle. > However if there is enough data the request limit for bigquery can be > exceeded causing an exception like the following. It seems that finishBundle > should limit the # of rows and bytes and possibly flush multiple times for a > destination. > Work around would be to use autosharding which uses state that has batching > limits or to increase the # of streaming keys to decrease the likelihood of > hitting this. > {code} > Error while processing a work item: UNKNOWN: > org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > POST > https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "reason" : "badRequest" > } ], > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "status" : "INVALID_ARGUMENT" > } > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements
[ https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17445136#comment-17445136 ] Sam Whittle commented on BEAM-12472: I believe this would be fixed by Pablo's PR as indicated by Michaels comment above. The underlying BigQueryServicesImpl now splits up rows to respect the rpc limits. > BigQuery streaming writes can be batched beyond request limit with > BatchAndInsertElements > - > > Key: BEAM-12472 > URL: https://issues.apache.org/jira/browse/BEAM-12472 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Priority: P2 > Labels: stale-P2 > > BatchAndInsertElements accumulates all the input elements and flushes them in > finishBundle. > However if there is enough data the request limit for bigquery can be > exceeded causing an exception like the following. It seems that finishBundle > should limit the # of rows and bytes and possibly flush multiple times for a > destination. > Work around would be to use autosharding which uses state that has batching > limits or to increase the # of streaming keys to decrease the likelihood of > hitting this. > {code} > Error while processing a work item: UNKNOWN: > org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > POST > https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "reason" : "badRequest" > } ], > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "status" : "INVALID_ARGUMENT" > } > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit
[ https://issues.apache.org/jira/browse/BEAM-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-13268: -- Assignee: Sam Whittle > Reduce latency by parallelizing BQ inserts when flushing due to row limit > - > > Key: BEAM-13268 > URL: https://issues.apache.org/jira/browse/BEAM-13268 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > InsertBatchedElements consumes the output of GroupIntoBatches which flushes > after default 500 elements to respect the max items per streaming insert > request to BQ. > However InsertBatchedElements flushes rows synchrounously meaning that > latencies of writes accumulates. It could instead initiate writes in > ProcessElement and block on write completion in FinishBundle. There could be > some limited configurable parallelism if desired to limit memory usage. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (BEAM-13268) Reduce latency by parallelizing BQ inserts when flushing due to row limit
Sam Whittle created BEAM-13268: -- Summary: Reduce latency by parallelizing BQ inserts when flushing due to row limit Key: BEAM-13268 URL: https://issues.apache.org/jira/browse/BEAM-13268 Project: Beam Issue Type: Improvement Components: io-java-gcp Reporter: Sam Whittle InsertBatchedElements consumes the output of GroupIntoBatches which flushes after default 500 elements to respect the max items per streaming insert request to BQ. However InsertBatchedElements flushes rows synchrounously meaning that latencies of writes accumulates. It could instead initiate writes in ProcessElement and block on write completion in FinishBundle. There could be some limited configurable parallelism if desired to limit memory usage. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (BEAM-13042) Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed
[ https://issues.apache.org/jira/browse/BEAM-13042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-13042: --- Fix Version/s: 2.35.0 Resolution: Fixed Status: Resolved (was: Open) > Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed > -- > > Key: BEAM-13042 > URL: https://issues.apache.org/jira/browse/BEAM-13042 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.35.0 > > Time Spent: 1h > Remaining Estimate: 0h > > We have observed stack traces in stuck jobs as follows: > --- Threads (1): [Thread[pool-8-thread-1,5,main]] State: WAITING stack: --- > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > > org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.hasFailed(RegisterAndProcessBundleOperation.java:407) > > org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:332) > > org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:326) > > org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$121/1203893465.run(Unknown > Source) > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > This code appears to not expect blocking as it checks isDone before calling > get() > public boolean hasFailed() throws ExecutionException, InterruptedException { > if (processBundleResponse != null && > processBundleResponse.toCompletableFuture().isDone()) { > return > !processBundleResponse.toCompletableFuture().get().getError().isEmpty(); > } else { > // At the very least, we don't know that this has failed yet. > return false; > } > } > I'm unsure why this is occurring but it could be that the two calls to > toCompletableFuture are returning different futures for some reason, or that > the completion stage is somehow changing from done to undone. In either case > this could by using a single call to CompletableFuture.getNow method which > guarantees not to block. > This affects the V1 Runner harness which isn't generally used but might as > well be fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12856) Allow for configuration of unbounded reader max elements, read time etc in StreamingDataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-12856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12856: --- Fix Version/s: 2.34.0 Resolution: Fixed Status: Resolved (was: Open) > Allow for configuration of unbounded reader max elements, read time etc in > StreamingDataflowRunner > -- > > Key: BEAM-12856 > URL: https://issues.apache.org/jira/browse/BEAM-12856 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.34.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Currently in WorkerCustomSources.java it is hard-coded to 10 seconds, 10k > elements, with 1sec waiting for elements if none are available. > There are cases where it would be beneficial to wait longer and process more > data so it would be nice if this was controlled by pipeline option. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12291) org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: false] is flaky
[ https://issues.apache.org/jira/browse/BEAM-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17428227#comment-17428227 ] Sam Whittle commented on BEAM-12291: Sorry I didn't see this was assigned to me earlier. I don't have context to debug the Flink tests. I took a look but didn't see anything obvious > org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: > false] is flaky > - > > Key: BEAM-12291 > URL: https://issues.apache.org/jira/browse/BEAM-12291 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Kenneth Knowles >Priority: P1 > Labels: flake, stale-assigned > Time Spent: 20m > Remaining Estimate: 0h > > Example: https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/17600/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-12291) org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: false] is flaky
[ https://issues.apache.org/jira/browse/BEAM-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-12291: -- Assignee: (was: Sam Whittle) > org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: > false] is flaky > - > > Key: BEAM-12291 > URL: https://issues.apache.org/jira/browse/BEAM-12291 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Kenneth Knowles >Priority: P1 > Labels: flake, stale-assigned > Time Spent: 20m > Remaining Estimate: 0h > > Example: https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/17600/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-13042) Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed
Sam Whittle created BEAM-13042: -- Summary: Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed Key: BEAM-13042 URL: https://issues.apache.org/jira/browse/BEAM-13042 Project: Beam Issue Type: Improvement Components: runner-dataflow Reporter: Sam Whittle Assignee: Sam Whittle We have observed stack traces in stuck jobs as follows: --- Threads (1): [Thread[pool-8-thread-1,5,main]] State: WAITING stack: --- sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.hasFailed(RegisterAndProcessBundleOperation.java:407) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:332) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:326) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$121/1203893465.run(Unknown Source) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) This code appears to not expect blocking as it checks isDone before calling get() public boolean hasFailed() throws ExecutionException, InterruptedException { if (processBundleResponse != null && processBundleResponse.toCompletableFuture().isDone()) { return !processBundleResponse.toCompletableFuture().get().getError().isEmpty(); } else { // At the very least, we don't know that this has failed yet. return false; } } I'm unsure why this is occurring but it could be that the two calls to toCompletableFuture are returning different futures for some reason, or that the completion stage is somehow changing from done to undone. In either case this could by using a single call to CompletableFuture.getNow method which guarantees not to block. This affects the V1 Runner harness which isn't generally used but might as well be fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages
[ https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12942: --- Fix Version/s: (was: 2.33.0) 2.34.0 > Dataflow runner specialization of PubsubIO should validate messages > --- > > Key: BEAM-12942 > URL: https://issues.apache.org/jira/browse/BEAM-12942 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.34.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > Currently invalid messages, such as those with attributes exceeding the > maximum size are processed by the bundle successfully but fail to commit. > Throwing an exception when trying to write such a message directly would > increase visibility as well as allowing users to catch and handle such > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages
[ https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12942: --- Fix Version/s: 2.33.0 Resolution: Fixed Status: Resolved (was: Open) > Dataflow runner specialization of PubsubIO should validate messages > --- > > Key: BEAM-12942 > URL: https://issues.apache.org/jira/browse/BEAM-12942 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.33.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > Currently invalid messages, such as those with attributes exceeding the > maximum size are processed by the bundle successfully but fail to commit. > Throwing an exception when trying to write such a message directly would > increase visibility as well as allowing users to catch and handle such > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages
[ https://issues.apache.org/jira/browse/BEAM-12942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17419231#comment-17419231 ] Sam Whittle commented on BEAM-12942: The validation seems like it could be helpful for other non-specialized PubsubIO because it would throw an exception when the message is produced instead of exception from pubsub publish result which could correspond to a batch and thus has less context. > Dataflow runner specialization of PubsubIO should validate messages > --- > > Key: BEAM-12942 > URL: https://issues.apache.org/jira/browse/BEAM-12942 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Currently invalid messages, such as those with attributes exceeding the > maximum size are processed by the bundle successfully but fail to commit. > Throwing an exception when trying to write such a message directly would > increase visibility as well as allowing users to catch and handle such > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12942) Dataflow runner specialization of PubsubIO should validate messages
Sam Whittle created BEAM-12942: -- Summary: Dataflow runner specialization of PubsubIO should validate messages Key: BEAM-12942 URL: https://issues.apache.org/jira/browse/BEAM-12942 Project: Beam Issue Type: Improvement Components: runner-dataflow Reporter: Sam Whittle Assignee: Sam Whittle Currently invalid messages, such as those with attributes exceeding the maximum size are processed by the bundle successfully but fail to commit. Throwing an exception when trying to write such a message directly would increase visibility as well as allowing users to catch and handle such exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-7913) Add drain() to DataflowPipelineJob
[ https://issues.apache.org/jira/browse/BEAM-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-7913 started by null. -- > Add drain() to DataflowPipelineJob > -- > > Key: BEAM-7913 > URL: https://issues.apache.org/jira/browse/BEAM-7913 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Priority: P3 > > Dataflow supports draining jobs but there is no easy programatic way to do it. > I propose adding a drain() method to DataflowPipelineJob similar to the > existing cancel() method (inherited from PipelineResult). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12740) Reduce and backoff GCS metadata operations when writing to GCS files
[ https://issues.apache.org/jira/browse/BEAM-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12740: --- Resolution: Fixed Status: Resolved (was: Open) > Reduce and backoff GCS metadata operations when writing to GCS files > > > Key: BEAM-12740 > URL: https://issues.apache.org/jira/browse/BEAM-12740 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 9h 20m > Remaining Estimate: 0h > > When issuing GCS operations affecting metadata (ie file-level operations not > read/write operations), GCS may return errors indicating backoff. See > https://cloud.google.com/storage/docs/request-rate#ramp-up > If such errors are encountered, currently the exception is not handled by > GcsUtil.java and is propagated, causing retries and backoff of all operations > at a higher level. Instead we should backoff and retry only such files that > require it. > Additionally FileBasedSink issues deletes for files that have been renamed. > The rename itself should take care of removing the original file and thus we > can reduce some metadata operations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12740) Reduce and backoff GCS metadata operations when writing to GCS files
[ https://issues.apache.org/jira/browse/BEAM-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12740: --- Fix Version/s: 2.34.0 > Reduce and backoff GCS metadata operations when writing to GCS files > > > Key: BEAM-12740 > URL: https://issues.apache.org/jira/browse/BEAM-12740 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.34.0 > > Time Spent: 9h 20m > Remaining Estimate: 0h > > When issuing GCS operations affecting metadata (ie file-level operations not > read/write operations), GCS may return errors indicating backoff. See > https://cloud.google.com/storage/docs/request-rate#ramp-up > If such errors are encountered, currently the exception is not handled by > GcsUtil.java and is propagated, causing retries and backoff of all operations > at a higher level. Instead we should backoff and retry only such files that > require it. > Additionally FileBasedSink issues deletes for files that have been renamed. > The rename itself should take care of removing the original file and thus we > can reduce some metadata operations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12445) Move Python's BigQuery streaming insert sink to the new BigQuery api client
[ https://issues.apache.org/jira/browse/BEAM-12445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416247#comment-17416247 ] Sam Whittle commented on BEAM-12445: I believe the timeout needs to be set on the new insert_json_rows call in bigquery_tools. I think we observed stuck call due to this. The retry at the higher level has a timeout of 2 minutes so a minute or 2 seems like a reasonable deadline for the call. > Move Python's BigQuery streaming insert sink to the new BigQuery api client > --- > > Key: BEAM-12445 > URL: https://issues.apache.org/jira/browse/BEAM-12445 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Reporter: Pablo Estrada >Priority: P2 > Time Spent: 7h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12776) Improve parallelism of closing files in FileIO
[ https://issues.apache.org/jira/browse/BEAM-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12776: --- Resolution: Fixed Status: Resolved (was: Open) > Improve parallelism of closing files in FileIO > -- > > Key: BEAM-12776 > URL: https://issues.apache.org/jira/browse/BEAM-12776 > Project: Beam > Issue Type: Bug > Components: io-java-files >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 2h > Remaining Estimate: 0h > > Currently close happens in processElement which is per-window. > If there are many windows firing this can throttle throughput waiting for IO > instead of closing in parallel in finishBundle. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12856) Allow for configuration of unbounded reader max elements, read time etc in StreamingDataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-12856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12856: --- Description: Currently in WorkerCustomSources.java it is hard-coded to 10 seconds, 10k elements, with 1sec waiting for elements if none are available. There are cases where it would be beneficial to wait longer and process more data so it would be nice if this was controlled by pipeline option. > Allow for configuration of unbounded reader max elements, read time etc in > StreamingDataflowRunner > -- > > Key: BEAM-12856 > URL: https://issues.apache.org/jira/browse/BEAM-12856 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Currently in WorkerCustomSources.java it is hard-coded to 10 seconds, 10k > elements, with 1sec waiting for elements if none are available. > There are cases where it would be beneficial to wait longer and process more > data so it would be nice if this was controlled by pipeline option. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12856) Allow for configuration of unbounded reader max elements, read time etc in StreamingDataflowRunner
Sam Whittle created BEAM-12856: -- Summary: Allow for configuration of unbounded reader max elements, read time etc in StreamingDataflowRunner Key: BEAM-12856 URL: https://issues.apache.org/jira/browse/BEAM-12856 Project: Beam Issue Type: Improvement Components: runner-dataflow Reporter: Sam Whittle Assignee: Sam Whittle -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-11994) Java BigQuery - Implement IO Request Count metrics
[ https://issues.apache.org/jira/browse/BEAM-11994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406716#comment-17406716 ] Sam Whittle commented on BEAM-11994: I observed the following in a test, it seems related to changes in ServiceCallMetric made in pull requests for this issue. java.lang.RuntimeException: java.util.ConcurrentModificationException at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1047) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:387) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:72) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$InsertBatchedElements.processElement(BatchedStreamingWrite.java:350) Caused by: java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442) at java.util.HashMap$EntryIterator.next(HashMap.java:1476) at java.util.HashMap$EntryIterator.next(HashMap.java:1474) at org.apache.beam.runners.core.metrics.MonitoringInfoMetricName.(MonitoringInfoMetricName.java:50) at org.apache.beam.runners.core.metrics.MonitoringInfoMetricName.named(MonitoringInfoMetricName.java:95) at org.apache.beam.runners.core.metrics.ServiceCallMetric.call(ServiceCallMetric.java:82) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:905) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1560) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) > Java BigQuery - Implement IO Request Count metrics > -- > > Key: BEAM-11994 > URL: https://issues.apache.org/jira/browse/BEAM-11994 > Project: Beam > Issue Type: Test > Components: io-java-gcp >Reporter: Alex Amato >Assignee: Alex Amato >Priority: P2 > Labels: stale-assigned > Time Spent: 23.5h > Remaining Estimate: 0h > > Reference PRs (See BigQuery IO example) and detailed explanation of what's > needed to instrument this IO with Request Count metrics is found in this > handoff doc: > [https://docs.google.com/document/d/1lrz2wE5Dl4zlUfPAenjXIQyleZvqevqoxhyE85aj4sc/edit'|https://docs.google.com/document/d/1lrz2wE5Dl4zlUfPAenjXIQyleZvqevqoxhyE85aj4sc/edit'?authuser=0] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12818) When writing to GCS, spread prefix of temporary files and reuse autoscaling of the temporary directory
Sam Whittle created BEAM-12818: -- Summary: When writing to GCS, spread prefix of temporary files and reuse autoscaling of the temporary directory Key: BEAM-12818 URL: https://issues.apache.org/jira/browse/BEAM-12818 Project: Beam Issue Type: Bug Components: io-java-gcp Reporter: Sam Whittle Assignee: Sam Whittle When writing files using FileIO, the given temporary directory has a subdirectory created in it for each FileBasedSink. This is useful for non-windowed output where the temporary directory can be matched to delete leftover files that were lost during processing. However for windowed writes such subdirectories are unnecessary and cause a common prefix to be shared for the temporary files. Additionally this common prefix varies per job and thus the autoscaling for the previous prefix is no longer effective, see https://cloud.google.com/storage/docs/request-rate#randomness_after_sequential_prefixes_is_not_as_effective -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12780) StreamingDataflowWorker should limit local retries
[ https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12780: --- Fix Version/s: 2.32.0 Resolution: Fixed Status: Resolved (was: In Progress) > StreamingDataflowWorker should limit local retries > -- > > Key: BEAM-12780 > URL: https://issues.apache.org/jira/browse/BEAM-12780 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.32.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Local retries are performed for most user exceptions > This causes things to get stuck unnecessarily if the work item itself was > somehow corrupted additionally such processing may no longer be relevant on > the worker and should be given up after a while. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-12780) StreamingDataflowWorker should limit local retries
[ https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-12780 started by Sam Whittle. -- > StreamingDataflowWorker should limit local retries > -- > > Key: BEAM-12780 > URL: https://issues.apache.org/jira/browse/BEAM-12780 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Local retries are performed for most user exceptions > This causes things to get stuck unnecessarily if the work item itself was > somehow corrupted additionally such processing may no longer be relevant on > the worker and should be given up after a while. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12780) StreamingDataflowWorker should limit local retries
[ https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12780: --- Status: Open (was: Triage Needed) > StreamingDataflowWorker should limit local retries > -- > > Key: BEAM-12780 > URL: https://issues.apache.org/jira/browse/BEAM-12780 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Local retries are performed for most user exceptions > This causes things to get stuck unnecessarily if the work item itself was > somehow corrupted additionally such processing may no longer be relevant on > the worker and should be given up after a while. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-12780) StreamingDataflowWorker should limit local retries
[ https://issues.apache.org/jira/browse/BEAM-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-12780: -- Assignee: Sam Whittle > StreamingDataflowWorker should limit local retries > -- > > Key: BEAM-12780 > URL: https://issues.apache.org/jira/browse/BEAM-12780 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Local retries are performed for most user exceptions > This causes things to get stuck unnecessarily if the work item itself was > somehow corrupted additionally such processing may no longer be relevant on > the worker and should be given up after a while. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12780) StreamingDataflowWorker should limit local retries
Sam Whittle created BEAM-12780: -- Summary: StreamingDataflowWorker should limit local retries Key: BEAM-12780 URL: https://issues.apache.org/jira/browse/BEAM-12780 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Sam Whittle Local retries are performed for most user exceptions This causes things to get stuck unnecessarily if the work item itself was somehow corrupted additionally such processing may no longer be relevant on the worker and should be given up after a while. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12776) Improve parallelism of closing files in FileIO
Sam Whittle created BEAM-12776: -- Summary: Improve parallelism of closing files in FileIO Key: BEAM-12776 URL: https://issues.apache.org/jira/browse/BEAM-12776 Project: Beam Issue Type: Bug Components: io-java-files Reporter: Sam Whittle Assignee: Sam Whittle Currently close happens in processElement which is per-window. If there are many windows firing this can throttle throughput waiting for IO instead of closing in parallel in finishBundle. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12740) Reduce and backoff GCS metadata operations when writing to GCS files
Sam Whittle created BEAM-12740: -- Summary: Reduce and backoff GCS metadata operations when writing to GCS files Key: BEAM-12740 URL: https://issues.apache.org/jira/browse/BEAM-12740 Project: Beam Issue Type: Bug Components: io-java-gcp Reporter: Sam Whittle Assignee: Sam Whittle When issuing GCS operations affecting metadata (ie file-level operations not read/write operations), GCS may return errors indicating backoff. See https://cloud.google.com/storage/docs/request-rate#ramp-up If such errors are encountered, currently the exception is not handled by GcsUtil.java and is propagated, causing retries and backoff of all operations at a higher level. Instead we should backoff and retry only such files that require it. Additionally FileBasedSink issues deletes for files that have been renamed. The rename itself should take care of removing the original file and thus we can reduce some metadata operations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine
[ https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12144: --- Fix Version/s: 2.31.0 Resolution: Fixed Status: Resolved (was: Open) > Dataflow streaming worker stuck and unable to get work from Streaming Engine > > > Key: BEAM-12144 > URL: https://issues.apache.org/jira/browse/BEAM-12144 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.26.0 >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.31.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Observed in 2.26 but seems like it could affect later versions as well, as > previous issues addressing similar problems were before 2.26. This seems > similar to BEAM-9651 but not the deadlock observed there. > The thread getting work has the following stack: > --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: --- > java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method) > > java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) > java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127) > > java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128) > > java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057) > > java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747) > > app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670) > java.base@11.0.9/java.lang.Thread.run(Thread.java:834) > The status page shows: > GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight > bytes allowed, current stream is 61355396ms old, last send 61355396ms, last > response -1ms > Showing that the stream was created 17 hours ago, sent the header message but > never received a response. With the stack trace it appears that the header > was never sent but the stream also didn't terminate with a deadline exceed. > This seems like a grpc issue to not get an error for the stream, however it > would be safer to not block indefinitely on the Phaser waiting for the send > and instead throw an exception after 2x the stream deadline for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12516) StreamingDataflowWorker.ShardedKey.toString throws exception if key is less than 100 bytes
Sam Whittle created BEAM-12516: -- Summary: StreamingDataflowWorker.ShardedKey.toString throws exception if key is less than 100 bytes Key: BEAM-12516 URL: https://issues.apache.org/jira/browse/BEAM-12516 Project: Beam Issue Type: Bug Components: runner-dataflow Affects Versions: 2.28.0 Reporter: Sam Whittle Assignee: Sam Whittle ByteString.substring does not support an end index beyond the string length. example exception: SLF4J: Failed toString() invocation on an object of type [org.apache.beam.runners.dataflow.worker.AutoValue_StreamingDataflowWorker_ShardedKey] java.lang.IndexOutOfBoundsException: End index: 100 >= 3 org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.checkRange(ByteString.java:1272) org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString$LiteralByteString.substring(ByteString.java:1343) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$ShardedKey.toString(StreamingDataflowWorker.java:1144) at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277) at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161) at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:151) at org.slf4j.impl.JDK14LoggerAdapter.error(JDK14LoggerAdapter.java:522) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$ComputationState.invalidateStuckCommits(StreamingDataflowWorker.java:2326) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements
[ https://issues.apache.org/jira/browse/BEAM-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364810#comment-17364810 ] Sam Whittle commented on BEAM-12472: I was using withoutInsertIds, which perhaps triggers a different code path than without. I believe that is newer functionality but I'm not familiar with the code and have further investigated. On Tue, Jun 15, 2021, 9:48 PM Kenneth Knowles (Jira) > BigQuery streaming writes can be batched beyond request limit with > BatchAndInsertElements > - > > Key: BEAM-12472 > URL: https://issues.apache.org/jira/browse/BEAM-12472 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Sam Whittle >Priority: P2 > > BatchAndInsertElements accumulates all the input elements and flushes them in > finishBundle. > However if there is enough data the request limit for bigquery can be > exceeded causing an exception like the following. It seems that finishBundle > should limit the # of rows and bytes and possibly flush multiple times for a > destination. > Work around would be to use autosharding which uses state that has batching > limits or to increase the # of streaming keys to decrease the likelihood of > hitting this. > "Error while processing a work item: UNKNOWN: > org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > POST > https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "reason" : "badRequest" > } ], > "message" : "Request payload size exceeds the limit: 10485760 bytes.", > "status" : "INVALID_ARGUMENT" > } > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown > Source) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12472) BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements
Sam Whittle created BEAM-12472: -- Summary: BigQuery streaming writes can be batched beyond request limit with BatchAndInsertElements Key: BEAM-12472 URL: https://issues.apache.org/jira/browse/BEAM-12472 Project: Beam Issue Type: Bug Components: io-java-gcp Reporter: Sam Whittle BatchAndInsertElements accumulates all the input elements and flushes them in finishBundle. However if there is enough data the request limit for bigquery can be exceeded causing an exception like the following. It seems that finishBundle should limit the # of rows and bytes and possibly flush multiple times for a destination. Work around would be to use autosharding which uses state that has batching limits or to increase the # of streaming keys to decrease the likelihood of hitting this. "Error while processing a work item: UNKNOWN: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request POST https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/datasets/nexmark_06090820455271/tables/nexmark_simple/insertAll?prettyPrint=false { "code" : 400, "errors" : [ { "domain" : "global", "message" : "Request payload size exceeds the limit: 10485760 bytes.", "reason" : "badRequest" } ], "message" : "Request payload size exceeds the limit: 10485760 bytes.", "status" : "INVALID_ARGUMENT" } at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1661) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12402) Optimize PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver
[ https://issues.apache.org/jira/browse/BEAM-12402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12402: --- Resolution: Fixed Status: Resolved (was: Open) > Optimize PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver > - > > Key: BEAM-12402 > URL: https://issues.apache.org/jira/browse/BEAM-12402 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 1h 10m > Remaining Estimate: 0h > > In Nexmark benchmark profile this was using 2% of cpu on > AbstractMapBasedMultimap$WrappedCollection$WrappedIterator > methods. > I believe this is due to the list returned here being a wrapper around the > multiset. > https://github.com/apache/beam/blob/8463a054c1d7e2b7ee8d11e9569e065cb5e02196/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java#L172 > We iterate over this list many times for counters. It appears that we don't > need a wrapped list as it is documented that all consumers should be > registered first. So it seems we can just create a copy there to an > immutable list and trivially save that cpu. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12402) Optimize PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver
Sam Whittle created BEAM-12402: -- Summary: Optimize PCollectionConsumerRegistry$MultiplexingMetricTrackingFnDataReceiver Key: BEAM-12402 URL: https://issues.apache.org/jira/browse/BEAM-12402 Project: Beam Issue Type: Bug Components: sdk-java-harness Reporter: Sam Whittle Assignee: Sam Whittle In Nexmark benchmark profile this was using 2% of cpu on AbstractMapBasedMultimap$WrappedCollection$WrappedIterator methods. I believe this is due to the list returned here being a wrapper around the multiset. https://github.com/apache/beam/blob/8463a054c1d7e2b7ee8d11e9569e065cb5e02196/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java#L172 We iterate over this list many times for counters. It appears that we don't need a wrapped list as it is documented that all consumers should be registered first. So it seems we can just create a copy there to an immutable list and trivially save that cpu. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7717) PubsubIO watermark tracking hovers near start of epoch
[ https://issues.apache.org/jira/browse/BEAM-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-7717: -- Fix Version/s: 2.31.0 Resolution: Fixed Status: Resolved (was: Open) > PubsubIO watermark tracking hovers near start of epoch > -- > > Key: BEAM-7717 > URL: https://issues.apache.org/jira/browse/BEAM-7717 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Steve Niemitz >Assignee: Sam Whittle >Priority: P1 > Fix For: 2.31.0 > > Time Spent: 50m > Remaining Estimate: 0h > > {quote}The watermark tracking seems off. The dataflow UI was reporting my > watermark as around (but not exactly) the epoch (it was ~1970-01-19), which > makes me wonder if seconds/milliseconds got confused somewhere (ie, if you > take the time since epoch in milliseconds now and interpret it as seconds, > you'll get somewhere around 1970-01-18).{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-7717) PubsubIO watermark tracking hovers near start of epoch
[ https://issues.apache.org/jira/browse/BEAM-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-7717: - Assignee: Sam Whittle > PubsubIO watermark tracking hovers near start of epoch > -- > > Key: BEAM-7717 > URL: https://issues.apache.org/jira/browse/BEAM-7717 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Steve Niemitz >Assignee: Sam Whittle >Priority: P1 > > {quote}The watermark tracking seems off. The dataflow UI was reporting my > watermark as around (but not exactly) the epoch (it was ~1970-01-19), which > makes me wonder if seconds/milliseconds got confused somewhere (ie, if you > take the time since epoch in milliseconds now and interpret it as seconds, > you'll get somewhere around 1970-01-18).{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine
[ https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342140#comment-17342140 ] Sam Whittle commented on BEAM-12144: PR is being reviewed https://github.com/apache/beam/pull/14714 > Dataflow streaming worker stuck and unable to get work from Streaming Engine > > > Key: BEAM-12144 > URL: https://issues.apache.org/jira/browse/BEAM-12144 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.26.0 >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 10m > Remaining Estimate: 0h > > Observed in 2.26 but seems like it could affect later versions as well, as > previous issues addressing similar problems were before 2.26. This seems > similar to BEAM-9651 but not the deadlock observed there. > The thread getting work has the following stack: > --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: --- > java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method) > > java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) > java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127) > > java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128) > > java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057) > > java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747) > > app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670) > java.base@11.0.9/java.lang.Thread.run(Thread.java:834) > The status page shows: > GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight > bytes allowed, current stream is 61355396ms old, last send 61355396ms, last > response -1ms > Showing that the stream was created 17 hours ago, sent the header message but > never received a response. With the stack trace it appears that the header > was never sent but the stream also didn't terminate with a deadline exceed. > This seems like a grpc issue to not get an error for the stream, however it > would be safer to not block indefinitely on the Phaser waiting for the send > and instead throw an exception after 2x the stream deadline for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine
[ https://issues.apache.org/jira/browse/BEAM-12144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12144: --- Labels: (was: stale-assigned) > Dataflow streaming worker stuck and unable to get work from Streaming Engine > > > Key: BEAM-12144 > URL: https://issues.apache.org/jira/browse/BEAM-12144 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.26.0 >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Observed in 2.26 but seems like it could affect later versions as well, as > previous issues addressing similar problems were before 2.26. This seems > similar to BEAM-9651 but not the deadlock observed there. > The thread getting work has the following stack: > --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: --- > java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method) > > java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) > java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127) > > java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128) > > java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057) > > java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747) > > app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843) > > app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047) > > app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670) > java.base@11.0.9/java.lang.Thread.run(Thread.java:834) > The status page shows: > GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight > bytes allowed, current stream is 61355396ms old, last send 61355396ms, last > response -1ms > Showing that the stream was created 17 hours ago, sent the header message but > never received a response. With the stack trace it appears that the header > was never sent but the stream also didn't terminate with a deadline exceed. > This seems like a grpc issue to not get an error for the stream, however it > would be safer to not block indefinitely on the Phaser waiting for the send > and instead throw an exception after 2x the stream deadline for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12254) Nexmark UnboundedReader does not report backlog correctly
[ https://issues.apache.org/jira/browse/BEAM-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12254: --- Resolution: Fixed Status: Resolved (was: Open) > Nexmark UnboundedReader does not report backlog correctly > - > > Key: BEAM-12254 > URL: https://issues.apache.org/jira/browse/BEAM-12254 > Project: Beam > Issue Type: Bug > Components: testing-nexmark >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 0.5h > Remaining Estimate: 0h > > The reader only reports backlog if it has been initialized by reading > elements and only if rate limiting is on. > However if rate limiting is off and their is a bounded # of elements the > backlog can also be calcualted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing
[ https://issues.apache.org/jira/browse/BEAM-12118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12118: --- Resolution: Fixed Status: Resolved (was: Triage Needed) > QueuingBeamFnDataClient adds polling latency to completing bundle processing > > > Key: BEAM-12118 > URL: https://issues.apache.org/jira/browse/BEAM-12118 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.30.0 > > Time Spent: 8h 50m > Remaining Estimate: 0h > > Currently the inboundDataClients are registered with recieve, and they add > data to a queue. There is no explicit indication from the clients that they > are no longer going to add values to the queue. > Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled > and if nothing is present all clients are polled to see if they are complete. > This design makes for unfortunate tradeoffs on poll timeout: > - cpu wasted with small timeout > - additional latency in noticing we have completed with larger timeout > With the existing InboundDataClient interface, we could have a separate > thread call awaitCompletion on all of the clients and then shutdown the queue > (adding a poison pill perhaps) > Or we could modify InboundDataClient interface to allow registering iterest > in when the client is done producing elements. The existing clients all seem > based upon futures which allow that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12209) DirectStreamObserver is not thread-safe as advertised due to racy integer operations
[ https://issues.apache.org/jira/browse/BEAM-12209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12209: --- Resolution: Fixed Status: Resolved (was: Open) > DirectStreamObserver is not thread-safe as advertised due to racy integer > operations > > > Key: BEAM-12209 > URL: https://issues.apache.org/jira/browse/BEAM-12209 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 1h 20m > Remaining Estimate: 0h > > An AtomicInteger should be used instead for the message counting for periodic > flow control checking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12203) Reduce thread context switches in BeamFnControlClient
[ https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12203: --- Resolution: Fixed Status: Resolved (was: Open) > Reduce thread context switches in BeamFnControlClient > - > > Key: BEAM-12203 > URL: https://issues.apache.org/jira/browse/BEAM-12203 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 1.5h > Remaining Estimate: 0h > > This class has a grpc observer that queues recieved messages on a > LinkedBlockingQueue and then has a separate thread pulling from the the queue > to add to an executor. > Instead we could just add directly to the executor from the grpc stream, as > the executor by default has an unbounded queue size (and there are comments > indicating that a limited size executor can cause deadlocks otherwise) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work started] (BEAM-12253) Read.UnboundedSourceAsSDFRestrictionTracker doesn't use cache for readers in getProgress
[ https://issues.apache.org/jira/browse/BEAM-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-12253 started by Sam Whittle. -- > Read.UnboundedSourceAsSDFRestrictionTracker doesn't use cache for readers in > getProgress > > > Key: BEAM-12253 > URL: https://issues.apache.org/jira/browse/BEAM-12253 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L963 > In getProgress, if currentReader == null a new reader is constructed from the > restriction checkpoint. I think that we should consult the reader cache as > creating readers is sometimes expensive. > I observed in Nexmark pipelines that readers were being created at this > location. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12254) Nexmark UnboundedReader does not report backlog correctly
Sam Whittle created BEAM-12254: -- Summary: Nexmark UnboundedReader does not report backlog correctly Key: BEAM-12254 URL: https://issues.apache.org/jira/browse/BEAM-12254 Project: Beam Issue Type: Bug Components: testing-nexmark Reporter: Sam Whittle Assignee: Sam Whittle The reader only reports backlog if it has been initialized by reading elements and only if rate limiting is on. However if rate limiting is off and their is a bounded # of elements the backlog can also be calcualted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12253) Read.UnboundedSourceAsSDFRestrictionTracker doesn't use cache for readers in getProgress
Sam Whittle created BEAM-12253: -- Summary: Read.UnboundedSourceAsSDFRestrictionTracker doesn't use cache for readers in getProgress Key: BEAM-12253 URL: https://issues.apache.org/jira/browse/BEAM-12253 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Sam Whittle Assignee: Sam Whittle https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L963 In getProgress, if currentReader == null a new reader is constructed from the restriction checkpoint. I think that we should consult the reader cache as creating readers is sometimes expensive. I observed in Nexmark pipelines that readers were being created at this location. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing
[ https://issues.apache.org/jira/browse/BEAM-12118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reopened BEAM-12118: Reopening to track fixing race triggering precondition > QueuingBeamFnDataClient adds polling latency to completing bundle processing > > > Key: BEAM-12118 > URL: https://issues.apache.org/jira/browse/BEAM-12118 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.30.0 > > Time Spent: 7h 50m > Remaining Estimate: 0h > > Currently the inboundDataClients are registered with recieve, and they add > data to a queue. There is no explicit indication from the clients that they > are no longer going to add values to the queue. > Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled > and if nothing is present all clients are polled to see if they are complete. > This design makes for unfortunate tradeoffs on poll timeout: > - cpu wasted with small timeout > - additional latency in noticing we have completed with larger timeout > With the existing InboundDataClient interface, we could have a separate > thread call awaitCompletion on all of the clients and then shutdown the queue > (adding a poison pill perhaps) > Or we could modify InboundDataClient interface to allow registering iterest > in when the client is done producing elements. The existing clients all seem > based upon futures which allow that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-12229) WindmillStateCache has a 0% hit rate in 2.29
[ https://issues.apache.org/jira/browse/BEAM-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-12229: -- Assignee: Sam Whittle (was: Reuven Lax) > WindmillStateCache has a 0% hit rate in 2.29 > > > Key: BEAM-12229 > URL: https://issues.apache.org/jira/browse/BEAM-12229 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.29.0 >Reporter: Steve Niemitz >Assignee: Sam Whittle >Priority: P1 > Fix For: 2.30.0 > > Time Spent: 40m > Remaining Estimate: 0h > > After upgrading to 2.29, I noticed that our jobs have a 0% state cache hit > rate. I see a very high eviction rate from the cache as well (it used to be > ~0, now its ~100,000+ evictions / second). > We never were on 2.28, so I can't say if it worked there, but it did work on > 2.27. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12229) WindmillStateCache has a 0% hit rate in 2.29
[ https://issues.apache.org/jira/browse/BEAM-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17333208#comment-17333208 ] Sam Whittle commented on BEAM-12229: Ugh, I see the issue. I modified how tokens were validated after testing the change and broke it for any fused stages with more than 1 step. The unit test is at a lower level than that misuse so it didn't catch it. I put together PR https://github.com/apache/beam/pull/14649 but was unable to modify StreamingDataflowWorkerTest to reproduce the issue yet due to it's complexity. I'd like to do that before submitting. Maybe it is best to rollback the original PR in the interim though. The bug should only affect the cache effectiveness not corrupt data AFAICT. Your combiner state could be different because the combiner does local combining differently before applying if it is cached. > WindmillStateCache has a 0% hit rate in 2.29 > > > Key: BEAM-12229 > URL: https://issues.apache.org/jira/browse/BEAM-12229 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.29.0 >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: P1 > Fix For: 2.30.0 > > Time Spent: 10m > Remaining Estimate: 0h > > After upgrading to 2.29, I noticed that our jobs have a 0% state cache hit > rate. I see a very high eviction rate from the cache as well (it used to be > ~0, now its ~100,000+ evictions / second). > We never were on 2.28, so I can't say if it worked there, but it did work on > 2.27. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12209) DirectStreamObserver is not thread-safe as advertised due to racy integer operations
Sam Whittle created BEAM-12209: -- Summary: DirectStreamObserver is not thread-safe as advertised due to racy integer operations Key: BEAM-12209 URL: https://issues.apache.org/jira/browse/BEAM-12209 Project: Beam Issue Type: Bug Components: java-fn-execution Reporter: Sam Whittle Assignee: Sam Whittle An AtomicInteger should be used instead for the message counting for periodic flow control checking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-12203) Reduce thread context switches in BeamFnControlClient
[ https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326662#comment-17326662 ] Sam Whittle edited comment on BEAM-12203 at 4/21/21, 8:15 PM: -- Realized that this could further be simplified by just instructing grpc to use the desired executor and we can execute requests inline to the observer. The previous path involved several thread switches: grpc executor -> direct observer which queued -> thread reading queue, queues on executor -> executor runs request [edited removing previous comment about buffered observer which was incorrect as that applies to output] was (Author: scwhittle): Realized that this could further be simplified by just instructing grpc to use the desired executor and we can execute requests inline to the observer. The previous path involved several thread switches: grpc executor -> direct observer which queued -> thread reading queue, queues on executor -> executor runs request or even worse if the buffering observer was enabled grpc executor -> buffering observer queues -> buffering observer pulling thread pulls from buffer queues on unbounded buffer -> thread reads queue, queues on executor -> executor runs request > Reduce thread context switches in BeamFnControlClient > - > > Key: BEAM-12203 > URL: https://issues.apache.org/jira/browse/BEAM-12203 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 1h > Remaining Estimate: 0h > > This class has a grpc observer that queues recieved messages on a > LinkedBlockingQueue and then has a separate thread pulling from the the queue > to add to an executor. > Instead we could just add directly to the executor from the grpc stream, as > the executor by default has an unbounded queue size (and there are comments > indicating that a limited size executor can cause deadlocks otherwise) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12203) Reduce thread context switches in BeamFnControlClient
[ https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326662#comment-17326662 ] Sam Whittle commented on BEAM-12203: Realized that this could further be simplified by just instructing grpc to use the desired executor and we can execute requests inline to the observer. The previous path involved several thread switches: grpc executor -> direct observer which queued -> thread reading queue, queues on executor -> executor runs request or even worse if the buffering observer was enabled grpc executor -> buffering observer queues -> buffering observer pulling thread pulls from buffer queues on unbounded buffer -> thread reads queue, queues on executor -> executor runs request > Reduce thread context switches in BeamFnControlClient > - > > Key: BEAM-12203 > URL: https://issues.apache.org/jira/browse/BEAM-12203 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 0.5h > Remaining Estimate: 0h > > This class has a grpc observer that queues recieved messages on a > LinkedBlockingQueue and then has a separate thread pulling from the the queue > to add to an executor. > Instead we could just add directly to the executor from the grpc stream, as > the executor by default has an unbounded queue size (and there are comments > indicating that a limited size executor can cause deadlocks otherwise) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12203) Reduce thread context switches in BeamFnControlClient
[ https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12203: --- Summary: Reduce thread context switches in BeamFnControlClient (was: Remove LinkedBlockingQueue from BeamFnControlClient) > Reduce thread context switches in BeamFnControlClient > - > > Key: BEAM-12203 > URL: https://issues.apache.org/jira/browse/BEAM-12203 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 0.5h > Remaining Estimate: 0h > > This class has a grpc observer that queues recieved messages on a > LinkedBlockingQueue and then has a separate thread pulling from the the queue > to add to an executor. > Instead we could just add directly to the executor from the grpc stream, as > the executor by default has an unbounded queue size (and there are comments > indicating that a limited size executor can cause deadlocks otherwise) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12203) Remove LinkedBlockingQueue from BeamFnControlClient
[ https://issues.apache.org/jira/browse/BEAM-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326606#comment-17326606 ] Sam Whittle commented on BEAM-12203: The put/take on this queue appears to be 2% of cpu on a benchmark. > Remove LinkedBlockingQueue from BeamFnControlClient > --- > > Key: BEAM-12203 > URL: https://issues.apache.org/jira/browse/BEAM-12203 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > This class has a grpc observer that queues recieved messages on a > LinkedBlockingQueue and then has a separate thread pulling from the the queue > to add to an executor. > Instead we could just add directly to the executor from the grpc stream, as > the executor by default has an unbounded queue size (and there are comments > indicating that a limited size executor can cause deadlocks otherwise) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12203) Remove LinkedBlockingQueue from BeamFnControlClient
Sam Whittle created BEAM-12203: -- Summary: Remove LinkedBlockingQueue from BeamFnControlClient Key: BEAM-12203 URL: https://issues.apache.org/jira/browse/BEAM-12203 Project: Beam Issue Type: Bug Components: sdk-java-harness Reporter: Sam Whittle Assignee: Sam Whittle This class has a grpc observer that queues recieved messages on a LinkedBlockingQueue and then has a separate thread pulling from the the queue to add to an executor. Instead we could just add directly to the executor from the grpc stream, as the executor by default has an unbounded queue size (and there are comments indicating that a limited size executor can cause deadlocks otherwise) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing
[ https://issues.apache.org/jira/browse/BEAM-12118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12118: --- Fix Version/s: 2.30.0 Resolution: Fixed Status: Resolved (was: Open) > QueuingBeamFnDataClient adds polling latency to completing bundle processing > > > Key: BEAM-12118 > URL: https://issues.apache.org/jira/browse/BEAM-12118 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.30.0 > > Time Spent: 7h 50m > Remaining Estimate: 0h > > Currently the inboundDataClients are registered with recieve, and they add > data to a queue. There is no explicit indication from the clients that they > are no longer going to add values to the queue. > Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled > and if nothing is present all clients are polled to see if they are complete. > This design makes for unfortunate tradeoffs on poll timeout: > - cpu wasted with small timeout > - additional latency in noticing we have completed with larger timeout > With the existing InboundDataClient interface, we could have a separate > thread call awaitCompletion on all of the clients and then shutdown the queue > (adding a poison pill perhaps) > Or we could modify InboundDataClient interface to allow registering iterest > in when the client is done producing elements. The existing clients all seem > based upon futures which allow that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11910) Increase subsequent page size for bags after the first
[ https://issues.apache.org/jira/browse/BEAM-11910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11910: --- Fix Version/s: 2.29.0 > Increase subsequent page size for bags after the first > -- > > Key: BEAM-11910 > URL: https://issues.apache.org/jira/browse/BEAM-11910 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Labels: stale-assigned > Fix For: 2.29.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently the page size of bags requested from the streaming dataflow backend > is always 8MB. In pipelines with large bags this can limit throughput as it > results in more round-trips to the backend. In particular with Streaming > Engine this is noticable due to increased latency. > I propose using 8MB for the first bag fetch and then doubling the limit for > subsequent paginations -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12127) Reduce counter overhead in PCollectionConsumerRegistry.accept
[ https://issues.apache.org/jira/browse/BEAM-12127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12127: --- Resolution: Fixed Status: Resolved (was: Open) > Reduce counter overhead in PCollectionConsumerRegistry.accept > - > > Key: BEAM-12127 > URL: https://issues.apache.org/jira/browse/BEAM-12127 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 1h 40m > Remaining Estimate: 0h > > DelegatingCounter.inc shows up as 21% of cpu on nexmark query 2 benchmark > under PCollectionConsumerRegistry.accept > 2% is actual counter incrementing, but the majority is the delegation of > DelegatingCounter which involves looking up thread-local state and then > getting the counter for the name from the counter container. However in this > case the counter container is known and can just be bound when constructing > the counter instaed of using DelegatingCounter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12142) Reduce overhead of MetricsEnvironment
[ https://issues.apache.org/jira/browse/BEAM-12142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12142: --- Fix Version/s: 2.30.0 Resolution: Fixed Status: Resolved (was: Open) > Reduce overhead of MetricsEnvironment > - > > Key: BEAM-12142 > URL: https://issues.apache.org/jira/browse/BEAM-12142 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.30.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently the MetricsContainer objects are stored in ThreadLocal state. This > means that scoping a new container involves a get and a set of thread-local > state. By instead putting a wrapper object in the thread-local state we can > use a single-lookup in thread local state to get/set and then reset. > This is showing up as a possible 7% cpu improvement in a nexmark query > benchmark > Additionally I think that removing from the threadlocal state is causing > overhead in get calls by causing the linear probing within the implementation > of ThreadLocal state to become more expensive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-12117) QueuingBeamFnDataClient inbound client set grows with BundleProcessor reuse
[ https://issues.apache.org/jira/browse/BEAM-12117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-12117: --- Resolution: Fixed Status: Resolved (was: Open) > QueuingBeamFnDataClient inbound client set grows with BundleProcessor reuse > --- > > Key: BEAM-12117 > URL: https://issues.apache.org/jira/browse/BEAM-12117 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 2h 10m > Remaining Estimate: 0h > > It should have a reset method and be reset when the BundleProcessor is reset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11910) Increase subsequent page size for bags after the first
[ https://issues.apache.org/jira/browse/BEAM-11910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11910: --- Resolution: Fixed Status: Resolved (was: Open) > Increase subsequent page size for bags after the first > -- > > Key: BEAM-11910 > URL: https://issues.apache.org/jira/browse/BEAM-11910 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Labels: stale-assigned > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently the page size of bags requested from the streaming dataflow backend > is always 8MB. In pipelines with large bags this can limit throughput as it > results in more round-trips to the backend. In particular with Streaming > Engine this is noticable due to increased latency. > I propose using 8MB for the first bag fetch and then doubling the limit for > subsequent paginations -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12144) Dataflow streaming worker stuck and unable to get work from Streaming Engine
Sam Whittle created BEAM-12144: -- Summary: Dataflow streaming worker stuck and unable to get work from Streaming Engine Key: BEAM-12144 URL: https://issues.apache.org/jira/browse/BEAM-12144 Project: Beam Issue Type: Bug Components: runner-dataflow Affects Versions: 2.26.0 Reporter: Sam Whittle Assignee: Sam Whittle Observed in 2.26 but seems like it could affect later versions as well, as previous issues addressing similar problems were before 2.26. This seems similar to BEAM-9651 but not the deadlock observed there. The thread getting work has the following stack: --- Threads (1): [Thread[DispatchThread,1,main]] State: WAITING stack: --- java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method) java.base@11.0.9/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) java.base@11.0.9/java.util.concurrent.Phaser$QNode.block(Phaser.java:1127) java.base@11.0.9/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128) java.base@11.0.9/java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1057) java.base@11.0.9/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:747) app//org.apache.beam.runners.dataflow.worker.windmill.DirectStreamObserver.onNext(DirectStreamObserver.java:49) app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.send(GrpcWindmillServer.java:662) app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.onNewStream(GrpcWindmillServer.java:868) app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$AbstractWindmillStream.startStream(GrpcWindmillServer.java:677) app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:860) app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer$GrpcGetWorkStream.(GrpcWindmillServer.java:843) app//org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.getWorkStream(GrpcWindmillServer.java:543) app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.streamingDispatchLoop(StreamingDataflowWorker.java:1047) app//org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$1.run(StreamingDataflowWorker.java:670) java.base@11.0.9/java.lang.Thread.run(Thread.java:834) The status page shows: GetWorkStream: 0 buffers, 400 inflight messages allowed, 67108864 inflight bytes allowed, current stream is 61355396ms old, last send 61355396ms, last response -1ms Showing that the stream was created 17 hours ago, sent the header message but never received a response. With the stack trace it appears that the header was never sent but the stream also didn't terminate with a deadline exceed. This seems like a grpc issue to not get an error for the stream, however it would be safer to not block indefinitely on the Phaser waiting for the send and instead throw an exception after 2x the stream deadline for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12142) Reduce overhead of MetricsEnvironment
Sam Whittle created BEAM-12142: -- Summary: Reduce overhead of MetricsEnvironment Key: BEAM-12142 URL: https://issues.apache.org/jira/browse/BEAM-12142 Project: Beam Issue Type: Bug Components: java-fn-execution Reporter: Sam Whittle Assignee: Sam Whittle Currently the MetricsContainer objects are stored in ThreadLocal state. This means that scoping a new container involves a get and a set of thread-local state. By instead putting a wrapper object in the thread-local state we can use a single-lookup in thread local state to get/set and then reset. This is showing up as a possible 7% cpu improvement in a nexmark query benchmark Additionally I think that removing from the threadlocal state is causing overhead in get calls by causing the linear probing within the implementation of ThreadLocal state to become more expensive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12127) Reduce counter overhead in PCollectionConsumerRegistry.accept
Sam Whittle created BEAM-12127: -- Summary: Reduce counter overhead in PCollectionConsumerRegistry.accept Key: BEAM-12127 URL: https://issues.apache.org/jira/browse/BEAM-12127 Project: Beam Issue Type: Bug Components: java-fn-execution Reporter: Sam Whittle Assignee: Sam Whittle DelegatingCounter.inc shows up as 21% of cpu on nexmark query 2 benchmark under PCollectionConsumerRegistry.accept 2% is actual counter incrementing, but the majority is the delegation of DelegatingCounter which involves looking up thread-local state and then getting the counter for the name from the counter container. However in this case the counter container is known and can just be bound when constructing the counter instaed of using DelegatingCounter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12118) QueuingBeamFnDataClient adds polling latency to completing bundle processing
Sam Whittle created BEAM-12118: -- Summary: QueuingBeamFnDataClient adds polling latency to completing bundle processing Key: BEAM-12118 URL: https://issues.apache.org/jira/browse/BEAM-12118 Project: Beam Issue Type: Bug Components: java-fn-execution Reporter: Sam Whittle Assignee: Sam Whittle Currently the inboundDataClients are registered with recieve, and they add data to a queue. There is no explicit indication from the clients that they are no longer going to add values to the queue. Within QueueingBeamFnDataClient.drainAndBlock the queue is therefore polled and if nothing is present all clients are polled to see if they are complete. This design makes for unfortunate tradeoffs on poll timeout: - cpu wasted with small timeout - additional latency in noticing we have completed with larger timeout With the existing InboundDataClient interface, we could have a separate thread call awaitCompletion on all of the clients and then shutdown the queue (adding a poison pill perhaps) Or we could modify InboundDataClient interface to allow registering iterest in when the client is done producing elements. The existing clients all seem based upon futures which allow that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-12117) QueuingBeamFnDataClient inbound client set grows with BundleProcessor reuse
Sam Whittle created BEAM-12117: -- Summary: QueuingBeamFnDataClient inbound client set grows with BundleProcessor reuse Key: BEAM-12117 URL: https://issues.apache.org/jira/browse/BEAM-12117 Project: Beam Issue Type: Bug Components: java-fn-execution Reporter: Sam Whittle Assignee: Sam Whittle It should have a reset method and be reset when the BundleProcessor is reset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11727) Optimize ExecutionStateSampler
[ https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11727: --- Resolution: Fixed Status: Resolved (was: Triage Needed) > Optimize ExecutionStateSampler > -- > > Key: BEAM-11727 > URL: https://issues.apache.org/jira/browse/BEAM-11727 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 1h 40m > Remaining Estimate: 0h > > Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run. > It appears to be using a ConcurrentSkipListSet and most of the cpu is > relatedd to inserts/removes in that, involving the system hash of the entries > as that is used for ordering. > The consistent ordering is unnecessary. Additionally for other reasons, > removal and iteration is already synchronized and so performance will likely > be better just using a synchronized HashMap and synchronizing in the add case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11730) Reduce context switches for dataflow streaming appliance getdata reads
[ https://issues.apache.org/jira/browse/BEAM-11730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11730: --- Resolution: Fixed Status: Resolved (was: Open) > Reduce context switches for dataflow streaming appliance getdata reads > -- > > Key: BEAM-11730 > URL: https://issues.apache.org/jira/browse/BEAM-11730 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 2h 10m > Remaining Estimate: 0h > > Currently reads are batched and issued from a separate thread pool of reading > threads. However in the case that there is not read queuing due to the max # > of parallel requests, switching threads is unnecessary and adds additional > context swithing overhead. > In a benchmark I observed 5% cpu spent on LockSupport.park which some > googling indicates is due to context switching. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11707) Optimize WindmillStateCache CPU usage
[ https://issues.apache.org/jira/browse/BEAM-11707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11707: --- Resolution: Fixed Status: Resolved (was: Open) > Optimize WindmillStateCache CPU usage > - > > Key: BEAM-11707 > URL: https://issues.apache.org/jira/browse/BEAM-11707 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Time Spent: 2h 50m > Remaining Estimate: 0h > > From profiling nexmark Query11 which has many unique tags per key, I observed > that the WindmillStateCache cpu usage was 6% of CPU. > The usage appears to be due to the invalidation set maintenance as well as > many reads/inserts. > The invalidation set is maintained so that if a key encounters an error > processing or the cache token changes, we can invalidate all the entries for > a key. Currently this is done by removing all entries for the key from the > cache. Another alternative which appears much more CPU efficient is to > instead leave the entries in the cache but make them unreachable. This can > be done by having a per-key object that uses object equality as part of the > cache lookup. Then to discard entries for the key, we start using a new > per-key object. Cleanup of per-key objects can be done with a weak reference > map. > Another cost to the cache is that objects are grouped by window so that they > are kept/evicted all at once. However currently when reading items into the > cache, we fetch the window set and then lookup each tag in it. This could be > cached for the key to avoid multiple cache lookups. Similarly for putting > objects we lookup and insert each tag separately and then update the cache to > update the weight for the per-window set. This could be done once after all > updates for the window have been made. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11729) Remove ReduceFnRunner eager class name evaluation for debug logging
[ https://issues.apache.org/jira/browse/BEAM-11729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11729: --- Resolution: Fixed Status: Resolved (was: Open) > Remove ReduceFnRunner eager class name evaluation for debug logging > --- > > Key: BEAM-11729 > URL: https://issues.apache.org/jira/browse/BEAM-11729 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Labels: stale-assigned > Time Spent: 40m > Remaining Estimate: 0h > > Shows up on profiles, forget what it was but was high enough I saw it and it > it is for window tracing. We should just use a constant for the class name > instead of calling SimpleClassName -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-11910) Increase subsequent page size for bags after the first
[ https://issues.apache.org/jira/browse/BEAM-11910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-11910: -- Assignee: Sam Whittle > Increase subsequent page size for bags after the first > -- > > Key: BEAM-11910 > URL: https://issues.apache.org/jira/browse/BEAM-11910 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Currently the page size of bags requested from the streaming dataflow backend > is always 8MB. In pipelines with large bags this can limit throughput as it > results in more round-trips to the backend. In particular with Streaming > Engine this is noticable due to increased latency. > I propose using 8MB for the first bag fetch and then doubling the limit for > subsequent paginations -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-11910) Increase subsequent page size for bags after the first
Sam Whittle created BEAM-11910: -- Summary: Increase subsequent page size for bags after the first Key: BEAM-11910 URL: https://issues.apache.org/jira/browse/BEAM-11910 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Sam Whittle Currently the page size of bags requested from the streaming dataflow backend is always 8MB. In pipelines with large bags this can limit throughput as it results in more round-trips to the backend. In particular with Streaming Engine this is noticable due to increased latency. I propose using 8MB for the first bag fetch and then doubling the limit for subsequent paginations -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-11144) TriggerStateMachine.prefetchOnElement and other prefetch methods use incorrect state for subtriggers
[ https://issues.apache.org/jira/browse/BEAM-11144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17289572#comment-17289572 ] Sam Whittle commented on BEAM-11144: Yes, thanks! > TriggerStateMachine.prefetchOnElement and other prefetch methods use > incorrect state for subtriggers > > > Key: BEAM-11144 > URL: https://issues.apache.org/jira/browse/BEAM-11144 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.26.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > prefetchOnElement takes a single StateAccessor corresponding to the root > trigger. It uses that state accessor to perform prefetches for the > subtrigger state. However that will use the root trigger namespace for the > tags in the subtriggers. This makes prefetching ineffective, introducing > possibly an additional round-trip once the data is actually read. > https://github.com/apache/beam/blob/68d6c8e6243b1d8f392840273f886276e2a8baff/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java#L296 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-11730) Reduce context switches for dataflow streaming appliance getdata reads
Sam Whittle created BEAM-11730: -- Summary: Reduce context switches for dataflow streaming appliance getdata reads Key: BEAM-11730 URL: https://issues.apache.org/jira/browse/BEAM-11730 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Sam Whittle Assignee: Sam Whittle Currently reads are batched and issued from a separate thread pool of reading threads. However in the case that there is not read queuing due to the max # of parallel requests, switching threads is unnecessary and adds additional context swithing overhead. In a benchmark I observed 5% cpu spent on LockSupport.park which some googling indicates is due to context switching. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-11729) Remove ReduceFnRunner eager class name evaluation for debug logging
Sam Whittle created BEAM-11729: -- Summary: Remove ReduceFnRunner eager class name evaluation for debug logging Key: BEAM-11729 URL: https://issues.apache.org/jira/browse/BEAM-11729 Project: Beam Issue Type: Bug Components: runner-core Reporter: Sam Whittle Assignee: Sam Whittle Shows up on profiles, forget what it was but was high enough I saw it and it it is for window tracing. We should just use a constant for the class name instead of calling SimpleClassName -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-11706) TriggerProto translation shows up as 1% cpu on some benchmarks
[ https://issues.apache.org/jira/browse/BEAM-11706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276313#comment-17276313 ] Sam Whittle commented on BEAM-11706: I had a typo in the jira issue for the pull request. The fix has been submitted as https://github.com/apache/beam/pull/13831 > TriggerProto translation shows up as 1% cpu on some benchmarks > -- > > Key: BEAM-11706 > URL: https://issues.apache.org/jira/browse/BEAM-11706 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > Fix For: 2.29.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11727) Optimize ExecutionStateSampler
[ https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11727: --- Component/s: (was: runner-dataflow) runner-core > Optimize ExecutionStateSampler > -- > > Key: BEAM-11727 > URL: https://issues.apache.org/jira/browse/BEAM-11727 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run. > It appears to be using a ConcurrentSkipListSet and most of the cpu is > relatedd to inserts/removes in that, involving the system hash of the entries > as that is used for ordering. > The consistent ordering is unnecessary. Additionally for other reasons, > removal and iteration is already synchronized and so performance will likely > be better just using a synchronized HashMap and synchronizing in the add case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-11727) Optimize ExecutionStateSampler
[ https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle updated BEAM-11727: --- Summary: Optimize ExecutionStateSampler (was: Optimize ExecutionStateTracker) > Optimize ExecutionStateSampler > -- > > Key: BEAM-11727 > URL: https://issues.apache.org/jira/browse/BEAM-11727 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run. > It appears to be using a ConcurrentSkipListSet and most of the cpu is > relatedd to inserts/removes in that, involving the system hash of the entries > as that is used for ordering. > The consistent ordering is unnecessary. Additionally for other reasons, > removal and iteration is already synchronized and so performance will likely > be better just using a synchronized HashMap and synchronizing in the add case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-11727) Optimize ExecutionStateTracker
[ https://issues.apache.org/jira/browse/BEAM-11727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Whittle reassigned BEAM-11727: -- Assignee: Sam Whittle > Optimize ExecutionStateTracker > -- > > Key: BEAM-11727 > URL: https://issues.apache.org/jira/browse/BEAM-11727 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Sam Whittle >Assignee: Sam Whittle >Priority: P2 > > Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run. > It appears to be using a ConcurrentSkipListSet and most of the cpu is > relatedd to inserts/removes in that, involving the system hash of the entries > as that is used for ordering. > The consistent ordering is unnecessary. Additionally for other reasons, > removal and iteration is already synchronized and so performance will likely > be better just using a synchronized HashMap and synchronizing in the add case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-11727) Optimize ExecutionStateTracker
Sam Whittle created BEAM-11727: -- Summary: Optimize ExecutionStateTracker Key: BEAM-11727 URL: https://issues.apache.org/jira/browse/BEAM-11727 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Sam Whittle Showed up as 1.5% of CPU on Nexmark 11 streaming benchmark run. It appears to be using a ConcurrentSkipListSet and most of the cpu is relatedd to inserts/removes in that, involving the system hash of the entries as that is used for ordering. The consistent ordering is unnecessary. Additionally for other reasons, removal and iteration is already synchronized and so performance will likely be better just using a synchronized HashMap and synchronizing in the add case. -- This message was sent by Atlassian Jira (v8.3.4#803005)