Here's the output I'm looking for (and getting): 2016-01-11T23:59:59.998Z low 682 2016-01-11T23:59:59.998Z medium 3 2016-01-12T23:59:59.998Z high 1 2016-01-12T23:59:59.998Z low 5533 2016-01-12T23:59:59.998Z medium 33 2016-01-13T23:59:59.998Z high 1 2016-01-13T23:59:59.998Z low 7001 2016-01-13T23:59:59.998Z medium 39 2016-01-14T23:59:59.998Z high 2 2016-01-14T23:59:59.998Z low 7664
It is a timestamp sorted report of processed data. My reading of Top's JavaDoc is that there are ordering guarantees: smallest <https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/Top.html#smallest-int-> (int count) Returns a PTransform that takes an inputPCollection<T> and returns a PCollection<List<T>> with a single element containing the smallest count elements of the inputPCollection<T>, in increasing order, sorted according to their natural order. It also says: All the elements of the result's List must fit into the memory of a single machine. Am I misunderstanding it? On Fri, May 20, 2016 at 1:54 PM Kenneth Knowles <[email protected]> wrote: > Hi Jesse, > > A PCollection does not have a definite order, but is just a multiset/bag > of elements. So any ordering you are seeing is a facet of a particular > runner, sort of a coincidence. Can you tell me more about your use case? > > Kenn > > On Fri, May 20, 2016 at 1:46 PM, Jesse Anderson <[email protected]> > wrote: > >> Kenn, >> >> The conversion to PCollection<String> doesn't work for me because I >> wanted to maintain order. To keep the order, I need things in >> PCollection<List<String>> To create the ordered list, I did: >> >> PCollection<List<String>> orderedList = >> formattedCountsGlobal.apply(Top.smallest(200)); >> Then tried to write it out with: >> >> >> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result")); >> >> I'm using Top.smallest as a hack to order results, but that's a separate >> topic. >> >> To answer my own question about DataOutputStream.writeUTF not working, a >> short is written out before the string is written. This causes the same >> issue as the VarInt. I should have used writeBytes(). That doesn't write >> out a size first. >> >> Thanks, >> >> Jesse >> >> On Fri, May 20, 2016, 11:47 AM Kenneth Knowles <[email protected]> wrote: >> >>> Hi Jesse, >>> >>> I'm having trouble following exactly where the trouble is arising, but >>> let me expand my main recommendation to be an edit of your code snippet >>> (please forgive any typos or type errors). >>> >>> Original: >>> ---------- >>> orderedList >>> .apply(TextIO.Write >>> .withCoder(ListCoder.of(StringDelegateCoder.of(String.class))) >>> .to("output/result")); >>> >>> >>> My main recommendation >>> --------------------- >>> import static org.apache.beam.values.TypeDescriptors.strings; >>> >>> orderedList >>> .apply(MapElements.via(x -> x.toString()).withOutputType(strings()) >>> .apply(TextIO.Write.to("output/result")); >>> >>> >>> Another approach, which I do not recommend >>> -------------------------------------------------------------- >>> orderedList >>> .apply(TextIO.Write >>> .withCoder(StringDelegateCoder.of(List.class)) >>> .to("output/result")); >>> >>> I don't recommend it because StringDelegateCoder; it is really intended >>> for things like URI which have a canonical string representation for 1-1 >>> conversions, not for readable human output. >>> >>> If neither of these works for you, perhaps you could paste a larger >>> snippet of your pipeline. >>> >>> Kenn >>> >>> On Thu, May 19, 2016 at 9:32 PM, Jesse Anderson <[email protected]> >>> wrote: >>> >>>> I'm writing out a PCollection<List<String>>. My goal is to write out >>>> each element in the list as a new line. >>>> >>>> The StringUtf8Coder also writes out a VarInt for the size of the bytes. >>>> The StringDelegateCoder with the ListCoder doesn't actually write out >>>> text. >>>> >>>> I think List<String> support should be added to TextIO.Write. Or maybe >>>> a new coder needs to be added that outputs text, with support for Lists, >>>> KVs, Sets, etc. >>>> >>>> On Thu, May 19, 2016 at 9:23 PM Kenneth Knowles <[email protected]> wrote: >>>> >>>>> Hi Jesse, >>>>> >>>>> StringDelegateCoder does just what you have said: it encodes using >>>>> #toString() and decodes assuming a single-arg constructor. >>>>> >>>>> But by analogy with what you have written, and if I understand your >>>>> goals correctly, what you want here is >>>>> TextIO.Write.withCoder(StringDelegateCoder.of(List.class)) >>>>> since you want to base it on List#toString() not String#toString(). >>>>> >>>>> That said, probably the best way to write a reliable and/or readable >>>>> format with TextIO.Write is to intentionally produce just the string you >>>>> want for your output format - including escaping newlines, etc - and then >>>>> use StringUtf8Coder. >>>>> >>>>> Kenn >>>>> >>>>> On Thu, May 19, 2016 at 9:00 PM, Jesse Anderson <[email protected] >>>>> > wrote: >>>>> >>>>>> I'm trying to write out a List<String> with TextIO.Write. The only >>>>>> supported type is String. I ended up writing an anonymous coder. >>>>>> >>>>>> I want to check if there is a a coder that I couldn't find that would >>>>>> just take an object and write out out the .toString() of it. >>>>>> >>>>>> I tried this: >>>>>> >>>>>> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result")); >>>>>> >>>>>> But a VarInt is encoded along with everything. I'm looking for a >>>>>> coder that only writes out the UTF8. >>>>>> >>>>>> This functionality would be similar to Hadoop TextOutputFormat. It >>>>>> just runs a .toString before writing it out. >>>>>> >>>>>> In the anonymous coder I wrote, I hit a weird issue. This code just >>>>>> writes out a bunch of "\n". Yes, value is populated with data. >>>>>> dataOutputStream.writeUTF(value); >>>>>> dataOutputStream.writeUTF("\n"); >>>>>> >>>>>> This code works: >>>>>> byte[] bytes = value.getBytes(StandardCharsets.UTF_8); >>>>>> dataOutputStream.write(bytes); >>>>>> dataOutputStream.writeUTF("\n"); >>>>>> >>>>>> I took this from the string coder. What's odd is that DOS' writeUTF >>>>>> should work too. Is there a reason why? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> jesse >>>>>> >>>>> >>>>> >>> >
