Here's the output I'm looking for (and getting):
2016-01-11T23:59:59.998Z low 682
2016-01-11T23:59:59.998Z medium 3
2016-01-12T23:59:59.998Z high 1
2016-01-12T23:59:59.998Z low 5533
2016-01-12T23:59:59.998Z medium 33
2016-01-13T23:59:59.998Z high 1
2016-01-13T23:59:59.998Z low 7001
2016-01-13T23:59:59.998Z medium 39
2016-01-14T23:59:59.998Z high 2
2016-01-14T23:59:59.998Z low 7664

It is a timestamp sorted report of processed data.

My reading of Top's JavaDoc is that there are ordering guarantees:
smallest
<https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/Top.html#smallest-int->
(int count)
Returns a PTransform that takes an inputPCollection<T> and returns a
PCollection<List<T>> with a single element containing the smallest
count elements
of the inputPCollection<T>, in increasing order, sorted according to their
natural order.

It also says:

All the elements of the result's List must fit into the memory of a single
machine.


Am I misunderstanding it?

On Fri, May 20, 2016 at 1:54 PM Kenneth Knowles <[email protected]> wrote:

> Hi Jesse,
>
> A PCollection does not have a definite order, but is just a multiset/bag
> of elements. So any ordering you are seeing is a facet of a particular
> runner, sort of a coincidence. Can you tell me more about your use case?
>
> Kenn
>
> On Fri, May 20, 2016 at 1:46 PM, Jesse Anderson <[email protected]>
> wrote:
>
>> Kenn,
>>
>> The conversion to PCollection<String> doesn't work for me because I
>> wanted to maintain order. To keep the order, I need things in
>> PCollection<List<String>> To create the ordered list, I did:
>>
>> PCollection<List<String>> orderedList =
>> formattedCountsGlobal.apply(Top.smallest(200));
>> Then tried to write it out with:
>>
>>
>> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result"));
>>
>> I'm using Top.smallest as a hack to order results, but that's a separate
>> topic.
>>
>> To answer my own question about DataOutputStream.writeUTF not working, a
>> short is written out before the string is written. This causes the same
>> issue as the VarInt. I should have used writeBytes(). That doesn't write
>> out a size first.
>>
>> Thanks,
>>
>> Jesse
>>
>> On Fri, May 20, 2016, 11:47 AM Kenneth Knowles <[email protected]> wrote:
>>
>>> Hi Jesse,
>>>
>>> I'm having trouble following exactly where the trouble is arising, but
>>> let me expand my main recommendation to be an edit of your code snippet
>>> (please forgive any typos or type errors).
>>>
>>> Original:
>>> ----------
>>> orderedList
>>>   .apply(TextIO.Write
>>>     .withCoder(ListCoder.of(StringDelegateCoder.of(String.class)))
>>>     .to("output/result"));
>>>
>>>
>>> My main recommendation
>>> ---------------------
>>> import static org.apache.beam.values.TypeDescriptors.strings;
>>>
>>> orderedList
>>>   .apply(MapElements.via(x -> x.toString()).withOutputType(strings())
>>>   .apply(TextIO.Write.to("output/result"));
>>>
>>>
>>> Another approach, which I do not recommend
>>> --------------------------------------------------------------
>>> orderedList
>>>   .apply(TextIO.Write
>>>     .withCoder(StringDelegateCoder.of(List.class))
>>>     .to("output/result"));
>>>
>>> I don't recommend it because StringDelegateCoder; it is really intended
>>> for things like URI which have a canonical string representation for 1-1
>>> conversions, not for readable human output.
>>>
>>> If neither of these works for you, perhaps you could paste a larger
>>> snippet of your pipeline.
>>>
>>> Kenn
>>>
>>> On Thu, May 19, 2016 at 9:32 PM, Jesse Anderson <[email protected]>
>>> wrote:
>>>
>>>> I'm writing out a PCollection<List<String>>. My goal is to write out
>>>> each element in the list as a new line.
>>>>
>>>> The StringUtf8Coder also writes out a VarInt for the size of the bytes.
>>>> The StringDelegateCoder with the ListCoder doesn't actually write out
>>>> text.
>>>>
>>>> I think List<String> support should be added to TextIO.Write. Or maybe
>>>> a new coder needs to be added that outputs text, with support for Lists,
>>>> KVs, Sets, etc.
>>>>
>>>> On Thu, May 19, 2016 at 9:23 PM Kenneth Knowles <[email protected]> wrote:
>>>>
>>>>> Hi Jesse,
>>>>>
>>>>> StringDelegateCoder does just what you have said: it encodes using
>>>>> #toString() and decodes assuming a single-arg constructor.
>>>>>
>>>>> But by analogy with what you have written, and if I understand your
>>>>> goals correctly, what you want here is 
>>>>> TextIO.Write.withCoder(StringDelegateCoder.of(List.class))
>>>>> since you want to base it on List#toString() not String#toString().
>>>>>
>>>>> That said, probably the best way to write a reliable and/or readable
>>>>> format with TextIO.Write is to intentionally produce just the string you
>>>>> want for your output format - including escaping newlines, etc - and then
>>>>> use StringUtf8Coder.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Thu, May 19, 2016 at 9:00 PM, Jesse Anderson <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> I'm trying to write out a List<String> with TextIO.Write. The only
>>>>>> supported type is String. I ended up writing an anonymous coder.
>>>>>>
>>>>>> I want to check if there is a a coder that I couldn't find that would
>>>>>> just take an object and write out out the .toString() of it.
>>>>>>
>>>>>> I tried this:
>>>>>>
>>>>>> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result"));
>>>>>>
>>>>>> But a VarInt is encoded along with everything. I'm looking for a
>>>>>> coder that only writes out the UTF8.
>>>>>>
>>>>>> This functionality would be similar to Hadoop TextOutputFormat. It
>>>>>> just runs a .toString before writing it out.
>>>>>>
>>>>>> In the anonymous coder I wrote, I hit a weird issue. This code just
>>>>>> writes out a bunch of "\n". Yes, value is populated with data.
>>>>>>           dataOutputStream.writeUTF(value);
>>>>>>           dataOutputStream.writeUTF("\n");
>>>>>>
>>>>>> This code works:
>>>>>>           byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
>>>>>>           dataOutputStream.write(bytes);
>>>>>>           dataOutputStream.writeUTF("\n");
>>>>>>
>>>>>> I took this from the string coder. What's odd is that DOS' writeUTF
>>>>>> should work too. Is there a reason why?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> jesse
>>>>>>
>>>>>
>>>>>
>>>
>

Reply via email to