Re: Multiple triggers contained w/in side input?
Good questions, On Wed, Nov 6, 2019 at 12:59 AM rahul patwari wrote: > Hi Kenn, > > Does the side input has elements from the previous trigger even when used > with .discardingFiredPanes() like > https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-global-window-side-inputs > > Yes, the elements from the previous trigger firing will be there. The elements with be different. Suppose: - you are doing a Sum and the inputs are 1, 2, 3, 4 - you trigger after 1, 2 and then trigger again after 3, 4 There will always be two elements in the output, and two elements go into the side input. The elements will be: - discardingFiredPanes: 3, 7 - accumulatingFiredPanes: 3, 10 Does View.asSingleton() affect this behaviour? > View.asSingleton() will crash if you have multiple triggers on a Combine.globally() or Sum.globally(), etc. Just like how View.asMap() will crash if you have multiple triggers on a per-key Combine/Sum/etc. > Thanks, > Rahul > > On Wed, Nov 6, 2019 at 11:24 AM Kenneth Knowles wrote: > >> >> >> On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon wrote: >> >>> From >>> https://beam.apache.org/documentation/programming-guide/#side-inputs >>> >>> > If the side input has multiple trigger firings, Beam uses the value >>> from the latest trigger firing. This is particularly useful if you use a >>> side input with a single global window and specify a trigger. >>> >> >> Sorry for this. The documentation is entirely wrong. If a side input has >> multiple firings, then all elements from all firings are included in the >> side input as though they are unrelated elements. I have filed >> https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the >> confusing result and give a good error message. >> >> I have this sub-pipeline: >>> >>> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane) >>> >> >> Are you sure you want this? It will drop all data after the first firing. >> We are about to disable such triggers due to the data loss risk. See >> https://s.apache.org/finishing-triggers-drop-data. If your intent is to >> drop all subsequent data, I am interested in your use case. Can you share >> more? >> >> >>> -> Combine.perKey (Max) >>> -> View.asMap >>> ...which I use as a side input. >>> >>> But I get a "Duplicate values for " error (DirectRunner). (Stack >>> trace below.) >>> >>> But the only way for duplicate keys to come out of the global window is >>> via multiple triggers. >>> >>> What am I missing? >>> >> >> This is surprising. Can you share the actual code of your pipeline? >> According to your pseudocode, this is impossible. The trigger you described >> should never fire multiple times. But as I mentioned above, the trigger is >> about to be forbidden. If we can learn about your usage, maybe that will >> help. >> >> Kenn >> >> >>> >>> >>> >>> === >>> java.lang.IllegalArgumentException: Duplicate values for :ihop >>> at >>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397) >>> at >>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373) >>> at >>> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548) >>> >>> >>>
Re: Multiple triggers contained w/in side input?
Hi Kenn, Does the side input has elements from the previous trigger even when used with .discardingFiredPanes() like https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-global-window-side-inputs Does View.asSingleton() affect this behaviour? Thanks, Rahul On Wed, Nov 6, 2019 at 11:24 AM Kenneth Knowles wrote: > > > On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon wrote: > >> From https://beam.apache.org/documentation/programming-guide/#side-inputs >> >> > If the side input has multiple trigger firings, Beam uses the value >> from the latest trigger firing. This is particularly useful if you use a >> side input with a single global window and specify a trigger. >> > > Sorry for this. The documentation is entirely wrong. If a side input has > multiple firings, then all elements from all firings are included in the > side input as though they are unrelated elements. I have filed > https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the > confusing result and give a good error message. > > I have this sub-pipeline: >> >> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane) >> > > Are you sure you want this? It will drop all data after the first firing. > We are about to disable such triggers due to the data loss risk. See > https://s.apache.org/finishing-triggers-drop-data. If your intent is to > drop all subsequent data, I am interested in your use case. Can you share > more? > > >> -> Combine.perKey (Max) >> -> View.asMap >> ...which I use as a side input. >> >> But I get a "Duplicate values for " error (DirectRunner). (Stack >> trace below.) >> >> But the only way for duplicate keys to come out of the global window is >> via multiple triggers. >> >> What am I missing? >> > > This is surprising. Can you share the actual code of your pipeline? > According to your pseudocode, this is impossible. The trigger you described > should never fire multiple times. But as I mentioned above, the trigger is > about to be forbidden. If we can learn about your usage, maybe that will > help. > > Kenn > > >> >> >> >> === >> java.lang.IllegalArgumentException: Duplicate values for :ihop >> at >> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397) >> at >> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373) >> at >> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548) >> >> >>
Re: Multiple triggers contained w/in side input?
Oops -- you already did clarify. Thank you. I see your ticket you filed. (Sorry, it's late.) Thanks again. On Wed, Nov 6, 2019 at 12:24 AM Aaron Dixon wrote: > I'm sorry for any alarms. My pseudocode should have read: > > GlobalWindow (triggering *Repeatedly.forever* AfterProcessingTime. > pastFirstElementInPane) > > I'm suspecting that this "last used trigger" does not apply to my usage .. > ie, View.asMap when applied to a Combine.perKey. > > Perhaps this "last used trigger" only applies to views of > Combine.globally-- could you confirm or clarify? > > Thanks. > > On Tue, Nov 5, 2019 at 11:54 PM Kenneth Knowles wrote: > >> >> >> On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon wrote: >> >>> From >>> https://beam.apache.org/documentation/programming-guide/#side-inputs >>> >>> > If the side input has multiple trigger firings, Beam uses the value >>> from the latest trigger firing. This is particularly useful if you use a >>> side input with a single global window and specify a trigger. >>> >> >> Sorry for this. The documentation is entirely wrong. If a side input has >> multiple firings, then all elements from all firings are included in the >> side input as though they are unrelated elements. I have filed >> https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the >> confusing result and give a good error message. >> >> I have this sub-pipeline: >>> >>> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane) >>> >> >> Are you sure you want this? It will drop all data after the first firing. >> We are about to disable such triggers due to the data loss risk. See >> https://s.apache.org/finishing-triggers-drop-data. If your intent is to >> drop all subsequent data, I am interested in your use case. Can you share >> more? >> >> >>> -> Combine.perKey (Max) >>> -> View.asMap >>> ...which I use as a side input. >>> >>> But I get a "Duplicate values for " error (DirectRunner). (Stack >>> trace below.) >>> >>> But the only way for duplicate keys to come out of the global window is >>> via multiple triggers. >>> >>> What am I missing? >>> >> >> This is surprising. Can you share the actual code of your pipeline? >> According to your pseudocode, this is impossible. The trigger you described >> should never fire multiple times. But as I mentioned above, the trigger is >> about to be forbidden. If we can learn about your usage, maybe that will >> help. >> >> Kenn >> >> >>> >>> >>> >>> === >>> java.lang.IllegalArgumentException: Duplicate values for :ihop >>> at >>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397) >>> at >>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373) >>> at >>> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74) >>> at >>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548) >>> >>> >>>
Re: Multiple triggers contained w/in side input?
I'm sorry for any alarms. My pseudocode should have read: GlobalWindow (triggering *Repeatedly.forever* AfterProcessingTime. pastFirstElementInPane) I'm suspecting that this "last used trigger" does not apply to my usage .. ie, View.asMap when applied to a Combine.perKey. Perhaps this "last used trigger" only applies to views of Combine.globally-- could you confirm or clarify? Thanks. On Tue, Nov 5, 2019 at 11:54 PM Kenneth Knowles wrote: > > > On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon wrote: > >> From https://beam.apache.org/documentation/programming-guide/#side-inputs >> >> > If the side input has multiple trigger firings, Beam uses the value >> from the latest trigger firing. This is particularly useful if you use a >> side input with a single global window and specify a trigger. >> > > Sorry for this. The documentation is entirely wrong. If a side input has > multiple firings, then all elements from all firings are included in the > side input as though they are unrelated elements. I have filed > https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the > confusing result and give a good error message. > > I have this sub-pipeline: >> >> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane) >> > > Are you sure you want this? It will drop all data after the first firing. > We are about to disable such triggers due to the data loss risk. See > https://s.apache.org/finishing-triggers-drop-data. If your intent is to > drop all subsequent data, I am interested in your use case. Can you share > more? > > >> -> Combine.perKey (Max) >> -> View.asMap >> ...which I use as a side input. >> >> But I get a "Duplicate values for " error (DirectRunner). (Stack >> trace below.) >> >> But the only way for duplicate keys to come out of the global window is >> via multiple triggers. >> >> What am I missing? >> > > This is surprising. Can you share the actual code of your pipeline? > According to your pseudocode, this is impossible. The trigger you described > should never fire multiple times. But as I mentioned above, the trigger is > about to be forbidden. If we can learn about your usage, maybe that will > help. > > Kenn > > >> >> >> >> === >> java.lang.IllegalArgumentException: Duplicate values for :ihop >> at >> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397) >> at >> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373) >> at >> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74) >> at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548) >> >> >>
Re: Multiple triggers contained w/in side input?
On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon wrote: > From https://beam.apache.org/documentation/programming-guide/#side-inputs > > > If the side input has multiple trigger firings, Beam uses the value > from the latest trigger firing. This is particularly useful if you use a > side input with a single global window and specify a trigger. > Sorry for this. The documentation is entirely wrong. If a side input has multiple firings, then all elements from all firings are included in the side input as though they are unrelated elements. I have filed https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the confusing result and give a good error message. I have this sub-pipeline: > > -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane) > Are you sure you want this? It will drop all data after the first firing. We are about to disable such triggers due to the data loss risk. See https://s.apache.org/finishing-triggers-drop-data. If your intent is to drop all subsequent data, I am interested in your use case. Can you share more? > -> Combine.perKey (Max) > -> View.asMap > ...which I use as a side input. > > But I get a "Duplicate values for " error (DirectRunner). (Stack > trace below.) > > But the only way for duplicate keys to come out of the global window is > via multiple triggers. > > What am I missing? > This is surprising. Can you share the actual code of your pipeline? According to your pseudocode, this is impossible. The trigger you described should never fire multiple times. But as I mentioned above, the trigger is about to be forbidden. If we can learn about your usage, maybe that will help. Kenn > > > > === > java.lang.IllegalArgumentException: Duplicate values for :ihop > at > org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397) > at > org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373) > at > org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548) > > >