To add some more color... If you are doing a GroupByKey on a PCollection<KV<K, V>>, you will get one KV<K, Iterable<V>> per key, per window, and pane (aka. trigger firing).
And regardless, every time you call context.output in a ParDo, you are adding a new element to the output PCollection. PCollections have no notion of ordering, so once you stick multiple elements into a PCollection, any relationship between them is lost. So in your case, if you want to keep the relationship between all the values in a given key/window/pane, you do need to keep them in the same element for future processing. But also note that means that any downstream writes will treat them as a single element. In this case, that works for you because you are presumably using TextIO.Write, which happens to be newline deliminated, and also shoving newlines within an element. But if you were to try to parse it back again using TextIO.Read, each line will be an independent element and you'd have to set timestamps and rewindow to get the association back again. And if you swap to a different sink that doesn't use newline delimination, you might need to do different formatting before writing. On Tue, May 17, 2016 at 1:47 PM, Jesse Anderson <[email protected]> wrote: > Somewhat related in case anyone hits this during Googling. If you want the > results of your GroupByKey to be grouped while writing out like: > 54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z > 54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z > 54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z > 107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z > 107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z > 107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z > 190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z > 190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z > > Simply doing a "context.output" for each line will not write them out one > after another. You'll to do a single "context.output" call with multiple > "\n" in the string. > > On Tue, May 17, 2016 at 11:57 AM Jesse Anderson <[email protected]> > wrote: > >> That's what I needed. Thanks Frances. >> >> On Tue, May 17, 2016 at 11:54 AM Frances Perry <[email protected]> wrote: >> >>> Re-window into the GlobalWindow (which covers all time). >>> >>> pc.apply(Window.<T>into(new GlobalWindows())) >>> >>> The GlobalWindow is the default. It works for bounded collections (aka. >>> batch mode) because once all the data has been processed, the system fast >>> forwards to the end of time. And if you are processing an unbounded >>> PCollection with aggregations (aka. streaming mode), you have to either set >>> a different windowing scheme or using triggers so that we aren't waiting >>> til the end of time for results. >>> >>> Hope that helps! >>> >>> On Tue, May 17, 2016 at 11:44 AM, Jesse Anderson <[email protected]> >>> wrote: >>> >>>> Is there a way to remove windowing from a PCollection? >>>> >>>> Let's say I had the following code: >>>> PCollection<String> windowedWords = parsed >>>> .apply(Window.<String>into( >>>> FixedWindows.of(Duration.standardSeconds(30)))); >>>> >>>> PCollection<KV<String, Long>> eventCounts = >>>> windowedWords.apply(Count.perElement()); >>>> >>>> // Don't window anymore for the GroupByKey >>>> PCollection<KV<String, Iterable<Long>>> grouped = >>>> eventCounts.apply(GroupByKey.<String, Long>create()); >>>> >>>> PCollection<String> formattedCounts = grouped.apply(ParDo.of(new >>>> TimedFN())); >>>> >>>> I've added the window and performed the counts. In the GroupByKey, I no >>>> longer want windowing to apply. I want to group by the key across windows >>>> now. The Iterable that comes back from the GroupByKey only has the one Long >>>> from its Window instead of all N Longs from all Windows. How do you do >>>> remove the Window to GroupByKey for all times? >>>> >>>> Thanks, >>>> >>>> Jesse >>>> >>> >>>
