Thanks for the snippet, updated BEAM-4470 with the additional details. On Mon, Jun 11, 2018 at 10:56 AM Carlos Alonso <car...@mrcalonso.com> wrote:
> Many thanks for your help. Actually, my use case emits the entire map > everytime, so I guess I'm good to go with discarding mode. > > This test reproduces the issue: > https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53 > > Hope it helps > > On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik <lc...@google.com> wrote: > >> Carlos, can you provide a test/code snippet for the bug that shows the >> issue? >> >> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote: >> >>> +dev@beam.apache.org >>> Note that this is likely a bug in the DirectRunner for accumulation >>> mode, filed: https://issues.apache.org/jira/browse/BEAM-4470 >>> >>> Discarding mode is meant to always be the latest firing, the issue >>> though is that you need to emit the entire map every time. If you can do >>> this, then it makes sense to use discarding mode. The issue with discarding >>> mode is that if your first trigger firing produces (A, 1), (B, 1) and your >>> second firing produces (B, 2), the multimap will only contain (B, 2) and >>> (A, 1) will have been discarded. >>> >>> To my knowledge, there is no guarantee about the order in which the >>> values are combined. You will need to use some piece of information about >>> the element to figure out which is the latest (or encode some additional >>> information along with each element to make this easy). >>> >>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <car...@mrcalonso.com> >>> wrote: >>> >>>> I've improved the example a little and added some tests >>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala >>>> >>>> The behaviour is slightly different, which is possibly because of the >>>> different runners (Dataflow/Direct) implementations, but still not working. >>>> >>>> Now what happens is that although the internal PCollection gets >>>> updated, the view isn't. This is happening regardless of the accumulation >>>> mode. >>>> >>>> Regarding the accumulation mode on Dataflow... That was it!! Now the >>>> sets contain all the items, however, one more question, is the ordering >>>> within the set deterministic? (i.e: Can I assume that the latest will >>>> always be on the last position of the Iterable object?) >>>> >>>> Also... given that for my particular case I only want the latest >>>> version, would you advice me to go ahead with Discarding mode? >>>> >>>> Regards >>>> >>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> The trigger definition in the sample code you have is using discarding >>>>> firing mode. Try swapping to using accumulating mode. >>>>> >>>>> >>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <car...@mrcalonso.com> >>>>> wrote: >>>>> >>>>>> But I think what I'm experiencing is quite different. Basically the >>>>>> side input is updated, but only one element is found on the Iterable that >>>>>> is the value of any key of the multimap. >>>>>> >>>>>> I mean, no concatenation seems to be happening. On the linked thread, >>>>>> Kenn suggests that every firing will add the new value to the set of >>>>>> values >>>>>> for the emitted key, but what I'm experiencing is that the new value is >>>>>> there, but just itself (i.e: is the only element in the set). >>>>>> >>>>>> @Robert, I'm using >>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()) >>>>>> >>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> An alternative to the thread that Kenn linked (adding support for >>>>>>> retractions) is to add explicit support for combiners into side inputs. >>>>>>> The >>>>>>> system currently works by using a hardcoded concatenating combiner, so >>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating >>>>>>> the >>>>>>> set of values emitted and then turning it into a view which is why it >>>>>>> is an >>>>>>> error for a singleton and map view if the trigger fires multiple times. >>>>>>> >>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <k...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, this is a known issue. Here's a prior discussion: >>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E >>>>>>>> >>>>>>>> It is actually long-standing and the solution is known but hard. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <car...@mrcalonso.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi everyone!! >>>>>>>>> >>>>>>>>> Working with multimap based side inputs on the global window I'm >>>>>>>>> experiencing something unexpected (at least to me) that I'd like to >>>>>>>>> share >>>>>>>>> with you to clarify. >>>>>>>>> >>>>>>>>> The way I understand multimaps is that when one emits two values >>>>>>>>> for the same key for the same window (obvious thing here as I'm >>>>>>>>> working on >>>>>>>>> the Global one), the newly emitted values are appended to the Iterable >>>>>>>>> collection that is the value for that particular key on the map. >>>>>>>>> >>>>>>>>> Testing it in this job (it is using scio, but side inputs are >>>>>>>>> implemented with PCollectionViews): >>>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala >>>>>>>>> >>>>>>>>> The steps to reproduce are: >>>>>>>>> 1. Create one table on the target BQ >>>>>>>>> 2. Run the job >>>>>>>>> 3. Patch the table on BQ (add one field), this should generate a >>>>>>>>> new TableSchema for the corresponding TableReference >>>>>>>>> 4. An updated value of the fields number appear on the logs, but >>>>>>>>> there is only one element within the iterable, as if it had been >>>>>>>>> updated >>>>>>>>> instead of appended!! >>>>>>>>> >>>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>