I made https://github.com/apache/beam/pull/9954 that explores this.

Thanks for the insight, Ning. Internally, we use a
different representation.

On Thu, Nov 7, 2019 at 2:27 PM Ning Kang <ni...@google.com> wrote:

> Hi Sam,
>
> Thanks for clarifying the accessor to output when building a pipeline.
>
> Internally, we have AppliedPTransform, where the output is always a
> dictionary:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L770
> And it seems to me that with key 'None', the output will be the main
> output.
>
> Ning.
>
> On Thu, Nov 7, 2019 at 1:39 PM Sam Rohde <sro...@google.com> wrote:
>
>> Hi All,
>>
>> In the Python SDK there are three ways of representing the output of a
>> PTransform with multiple PCollections:
>>
>>    - dictionary: PCollection tag --> PCollection
>>    - tuple: index --> PCollection
>>    - DoOutputsTuple: tag, index, or field name --> PCollection
>>
>> I find this inconsistent way of accessing multiple outputs to be
>> confusing. Say that you have an arbitrary PTransform with multiple outputs.
>> How do you know how to access an individual output without looking at the
>> source code? *You can't!* Remember there are three representations of
>> multiple outputs. So, you need to look at the output type and determine
>> what the output actually is.
>>
>> What purpose does it serve to have three different ways of representing a
>> single concept of multiple output PCollections?
>>
>> My proposal is to have a single representation analogous to Java's
>> PCollectionTuple. With this new type you will able to access PCollections
>> by tag with the "[ ]" operator or by field name. It should also up-convert
>> returned tuples, dicts, and DoOutputsTuples from composites into this new
>> type.
>>
>> Full example:
>>
>> class SomeCustomComposite(PTransform):
>>   def expand(self, pcoll):
>>     def my_multi_do_fn(x):
>>       if isinstance(x, int):
>>         yield pvalue.TaggedOutput('number', x)
>>       if isinstance(x, str):
>>         yield pvalue.TaggedOutput('string', x)
>>
>>     def printer(x):
>>       print(x)
>>       yield x
>>
>>     outputs = pcoll | beam.ParDo(my_multi_do_fn).with_outputs()*    return 
>> pvalue.PTuple({
>>         'number': output.number | beam.ParDo(printer),
>>         'string': output.string | beam.ParDo(printer)
>>     })*
>>
>> p = beam.Pipeline()
>> *main = p | SomeCustomComposite()*
>>
>> # Access PCollection by field name.
>> numbers = *main.number* | beam.ParDo(...)
>>
>> # Access PCollection by tag.
>> strings = *main['string']* | beam.ParDo(...)
>>
>> What do you think? Does this clear up the confusion of using multiple
>> output PCollections in Python?
>>
>> Regards,
>> Sam
>>
>

Reply via email to