Understanding these ordering guarantees is fundamental. Is my understanding
of the ordering guarantees for Top and List correct?

On Fri, May 20, 2016, 6:48 PM Jesse Anderson <[email protected]> wrote:

> Here's the output I'm looking for (and getting):
> 2016-01-11T23:59:59.998Z low 682
> 2016-01-11T23:59:59.998Z medium 3
> 2016-01-12T23:59:59.998Z high 1
> 2016-01-12T23:59:59.998Z low 5533
> 2016-01-12T23:59:59.998Z medium 33
> 2016-01-13T23:59:59.998Z high 1
> 2016-01-13T23:59:59.998Z low 7001
> 2016-01-13T23:59:59.998Z medium 39
> 2016-01-14T23:59:59.998Z high 2
> 2016-01-14T23:59:59.998Z low 7664
>
> It is a timestamp sorted report of processed data.
>
> My reading of Top's JavaDoc is that there are ordering guarantees:
> smallest
> <https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/Top.html#smallest-int->
> (int count)
> Returns a PTransform that takes an inputPCollection<T> and returns a
> PCollection<List<T>> with a single element containing the smallest count 
> elements
> of the inputPCollection<T>, in increasing order, sorted according to
> their natural order.
>
> It also says:
>
> All the elements of the result's List must fit into the memory of a
> single machine.
>
>
> Am I misunderstanding it?
>
> On Fri, May 20, 2016 at 1:54 PM Kenneth Knowles <[email protected]> wrote:
>
>> Hi Jesse,
>>
>> A PCollection does not have a definite order, but is just a multiset/bag
>> of elements. So any ordering you are seeing is a facet of a particular
>> runner, sort of a coincidence. Can you tell me more about your use case?
>>
>> Kenn
>>
>> On Fri, May 20, 2016 at 1:46 PM, Jesse Anderson <[email protected]>
>> wrote:
>>
>>> Kenn,
>>>
>>> The conversion to PCollection<String> doesn't work for me because I
>>> wanted to maintain order. To keep the order, I need things in
>>> PCollection<List<String>> To create the ordered list, I did:
>>>
>>> PCollection<List<String>> orderedList =
>>> formattedCountsGlobal.apply(Top.smallest(200));
>>> Then tried to write it out with:
>>>
>>>
>>> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result"));
>>>
>>> I'm using Top.smallest as a hack to order results, but that's a separate
>>> topic.
>>>
>>> To answer my own question about DataOutputStream.writeUTF not working, a
>>> short is written out before the string is written. This causes the same
>>> issue as the VarInt. I should have used writeBytes(). That doesn't
>>> write out a size first.
>>>
>>> Thanks,
>>>
>>> Jesse
>>>
>>> On Fri, May 20, 2016, 11:47 AM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> Hi Jesse,
>>>>
>>>> I'm having trouble following exactly where the trouble is arising, but
>>>> let me expand my main recommendation to be an edit of your code snippet
>>>> (please forgive any typos or type errors).
>>>>
>>>> Original:
>>>> ----------
>>>> orderedList
>>>>   .apply(TextIO.Write
>>>>     .withCoder(ListCoder.of(StringDelegateCoder.of(String.class)))
>>>>     .to("output/result"));
>>>>
>>>>
>>>> My main recommendation
>>>> ---------------------
>>>> import static org.apache.beam.values.TypeDescriptors.strings;
>>>>
>>>> orderedList
>>>>   .apply(MapElements.via(x -> x.toString()).withOutputType(strings())
>>>>   .apply(TextIO.Write.to("output/result"));
>>>>
>>>>
>>>> Another approach, which I do not recommend
>>>> --------------------------------------------------------------
>>>> orderedList
>>>>   .apply(TextIO.Write
>>>>     .withCoder(StringDelegateCoder.of(List.class))
>>>>     .to("output/result"));
>>>>
>>>> I don't recommend it because StringDelegateCoder; it is really
>>>> intended for things like URI which have a canonical string representation
>>>> for 1-1 conversions, not for readable human output.
>>>>
>>>> If neither of these works for you, perhaps you could paste a larger
>>>> snippet of your pipeline.
>>>>
>>>> Kenn
>>>>
>>>> On Thu, May 19, 2016 at 9:32 PM, Jesse Anderson <[email protected]>
>>>> wrote:
>>>>
>>>>> I'm writing out a PCollection<List<String>>. My goal is to write out
>>>>> each element in the list as a new line.
>>>>>
>>>>> The StringUtf8Coder also writes out a VarInt for the size of the
>>>>> bytes. The StringDelegateCoder with the ListCoder doesn't actually
>>>>> write out text.
>>>>>
>>>>> I think List<String> support should be added to TextIO.Write. Or maybe
>>>>> a new coder needs to be added that outputs text, with support for Lists,
>>>>> KVs, Sets, etc.
>>>>>
>>>>> On Thu, May 19, 2016 at 9:23 PM Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Jesse,
>>>>>>
>>>>>> StringDelegateCoder does just what you have said: it encodes using
>>>>>> #toString() and decodes assuming a single-arg constructor.
>>>>>>
>>>>>> But by analogy with what you have written, and if I understand your
>>>>>> goals correctly, what you want here is 
>>>>>> TextIO.Write.withCoder(StringDelegateCoder.of(List.class))
>>>>>> since you want to base it on List#toString() not String#toString().
>>>>>>
>>>>>> That said, probably the best way to write a reliable and/or readable
>>>>>> format with TextIO.Write is to intentionally produce just the string you
>>>>>> want for your output format - including escaping newlines, etc - and then
>>>>>> use StringUtf8Coder.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Thu, May 19, 2016 at 9:00 PM, Jesse Anderson <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I'm trying to write out a List<String> with TextIO.Write. The only
>>>>>>> supported type is String. I ended up writing an anonymous coder.
>>>>>>>
>>>>>>> I want to check if there is a a coder that I couldn't find that
>>>>>>> would just take an object and write out out the .toString() of it.
>>>>>>>
>>>>>>> I tried this:
>>>>>>>
>>>>>>> orderedList.apply(TextIO.Write.withCoder(ListCoder.of(StringDelegateCoder.of(String.class))).to("output/result"));
>>>>>>>
>>>>>>> But a VarInt is encoded along with everything. I'm looking for a
>>>>>>> coder that only writes out the UTF8.
>>>>>>>
>>>>>>> This functionality would be similar to Hadoop TextOutputFormat. It
>>>>>>> just runs a .toString before writing it out.
>>>>>>>
>>>>>>> In the anonymous coder I wrote, I hit a weird issue. This code just
>>>>>>> writes out a bunch of "\n". Yes, value is populated with data.
>>>>>>>           dataOutputStream.writeUTF(value);
>>>>>>>           dataOutputStream.writeUTF("\n");
>>>>>>>
>>>>>>> This code works:
>>>>>>>           byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
>>>>>>>           dataOutputStream.write(bytes);
>>>>>>>           dataOutputStream.writeUTF("\n");
>>>>>>>
>>>>>>> I took this from the string coder. What's odd is that DOS' writeUTF
>>>>>>> should work too. Is there a reason why?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> jesse
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>

Reply via email to