Understanding these ordering guarantees is fundamental. Is my understanding of the ordering guarantees for Top and List correct?
On Fri, May 20, 2016, 6:48 PM Jesse Anderson <[email protected]> wrote: > Here's the output I'm looking for (and getting): > 2016-01-11T23:59:59.998Z low 682 > 2016-01-11T23:59:59.998Z medium 3 > 2016-01-12T23:59:59.998Z high 1 > 2016-01-12T23:59:59.998Z low 5533 > 2016-01-12T23:59:59.998Z medium 33 > 2016-01-13T23:59:59.998Z high 1 > 2016-01-13T23:59:59.998Z low 7001 > 2016-01-13T23:59:59.998Z medium 39 > 2016-01-14T23:59:59.998Z high 2 > 2016-01-14T23:59:59.998Z low 7664 > > It is a timestamp sorted report of processed data. > > My reading of Top's JavaDoc is that there are ordering guarantees: > smallest > <https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/Top.html#smallest-int-> > (int count) > Returns a PTransform that takes an inputPCollection<T> and returns a > PCollection<List<T>> with a single element containing the smallest count > elements > of the inputPCollection<T>, in increasing order, sorted according to > their natural order. > > It also says: > > All the elements of the result's List must fit into the memory of a > single machine. > > > Am I misunderstanding it? > > On Fri, May 20, 2016 at 1:54 PM Kenneth Knowles <[email protected]> wrote: > >> Hi Jesse, >> >> A PCollection does not have a definite order, but is just a multiset/bag >> of elements. So any ordering you are seeing is a facet of a particular >> runner, sort of a coincidence. Can you tell me more about your use case? >> >> Kenn >> >> On Fri, May 20, 2016 at 1:46 PM, Jesse Anderson <[email protected]> >> wrote: >> >>> Kenn, >>> >>> The conversion to PCollection<String> doesn't work for me because I >>> wanted to maintain order. To keep the order, I need things in >>> PCollection<List<String>> To create the ordered list, I did: >>> >>> PCollection<List<String>> orderedList = >>> formattedCountsGlobal.apply(Top.smallest(200)); >>> Then tried to write it out with: >>> >>> >>> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result")); >>> >>> I'm using Top.smallest as a hack to order results, but that's a separate >>> topic. >>> >>> To answer my own question about DataOutputStream.writeUTF not working, a >>> short is written out before the string is written. This causes the same >>> issue as the VarInt. I should have used writeBytes(). That doesn't >>> write out a size first. >>> >>> Thanks, >>> >>> Jesse >>> >>> On Fri, May 20, 2016, 11:47 AM Kenneth Knowles <[email protected]> wrote: >>> >>>> Hi Jesse, >>>> >>>> I'm having trouble following exactly where the trouble is arising, but >>>> let me expand my main recommendation to be an edit of your code snippet >>>> (please forgive any typos or type errors). >>>> >>>> Original: >>>> ---------- >>>> orderedList >>>> .apply(TextIO.Write >>>> .withCoder(ListCoder.of(StringDelegateCoder.of(String.class))) >>>> .to("output/result")); >>>> >>>> >>>> My main recommendation >>>> --------------------- >>>> import static org.apache.beam.values.TypeDescriptors.strings; >>>> >>>> orderedList >>>> .apply(MapElements.via(x -> x.toString()).withOutputType(strings()) >>>> .apply(TextIO.Write.to("output/result")); >>>> >>>> >>>> Another approach, which I do not recommend >>>> -------------------------------------------------------------- >>>> orderedList >>>> .apply(TextIO.Write >>>> .withCoder(StringDelegateCoder.of(List.class)) >>>> .to("output/result")); >>>> >>>> I don't recommend it because StringDelegateCoder; it is really >>>> intended for things like URI which have a canonical string representation >>>> for 1-1 conversions, not for readable human output. >>>> >>>> If neither of these works for you, perhaps you could paste a larger >>>> snippet of your pipeline. >>>> >>>> Kenn >>>> >>>> On Thu, May 19, 2016 at 9:32 PM, Jesse Anderson <[email protected]> >>>> wrote: >>>> >>>>> I'm writing out a PCollection<List<String>>. My goal is to write out >>>>> each element in the list as a new line. >>>>> >>>>> The StringUtf8Coder also writes out a VarInt for the size of the >>>>> bytes. The StringDelegateCoder with the ListCoder doesn't actually >>>>> write out text. >>>>> >>>>> I think List<String> support should be added to TextIO.Write. Or maybe >>>>> a new coder needs to be added that outputs text, with support for Lists, >>>>> KVs, Sets, etc. >>>>> >>>>> On Thu, May 19, 2016 at 9:23 PM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Jesse, >>>>>> >>>>>> StringDelegateCoder does just what you have said: it encodes using >>>>>> #toString() and decodes assuming a single-arg constructor. >>>>>> >>>>>> But by analogy with what you have written, and if I understand your >>>>>> goals correctly, what you want here is >>>>>> TextIO.Write.withCoder(StringDelegateCoder.of(List.class)) >>>>>> since you want to base it on List#toString() not String#toString(). >>>>>> >>>>>> That said, probably the best way to write a reliable and/or readable >>>>>> format with TextIO.Write is to intentionally produce just the string you >>>>>> want for your output format - including escaping newlines, etc - and then >>>>>> use StringUtf8Coder. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Thu, May 19, 2016 at 9:00 PM, Jesse Anderson < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I'm trying to write out a List<String> with TextIO.Write. The only >>>>>>> supported type is String. I ended up writing an anonymous coder. >>>>>>> >>>>>>> I want to check if there is a a coder that I couldn't find that >>>>>>> would just take an object and write out out the .toString() of it. >>>>>>> >>>>>>> I tried this: >>>>>>> >>>>>>> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result")); >>>>>>> >>>>>>> But a VarInt is encoded along with everything. I'm looking for a >>>>>>> coder that only writes out the UTF8. >>>>>>> >>>>>>> This functionality would be similar to Hadoop TextOutputFormat. It >>>>>>> just runs a .toString before writing it out. >>>>>>> >>>>>>> In the anonymous coder I wrote, I hit a weird issue. This code just >>>>>>> writes out a bunch of "\n". Yes, value is populated with data. >>>>>>> dataOutputStream.writeUTF(value); >>>>>>> dataOutputStream.writeUTF("\n"); >>>>>>> >>>>>>> This code works: >>>>>>> byte[] bytes = value.getBytes(StandardCharsets.UTF_8); >>>>>>> dataOutputStream.write(bytes); >>>>>>> dataOutputStream.writeUTF("\n"); >>>>>>> >>>>>>> I took this from the string coder. What's odd is that DOS' writeUTF >>>>>>> should work too. Is there a reason why? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> jesse >>>>>>> >>>>>> >>>>>> >>>> >>
