I made https://github.com/apache/beam/pull/9954 that explores this.
Thanks for the insight, Ning. Internally, we use a different representation. On Thu, Nov 7, 2019 at 2:27 PM Ning Kang <ni...@google.com> wrote: > Hi Sam, > > Thanks for clarifying the accessor to output when building a pipeline. > > Internally, we have AppliedPTransform, where the output is always a > dictionary: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L770 > And it seems to me that with key 'None', the output will be the main > output. > > Ning. > > On Thu, Nov 7, 2019 at 1:39 PM Sam Rohde <sro...@google.com> wrote: > >> Hi All, >> >> In the Python SDK there are three ways of representing the output of a >> PTransform with multiple PCollections: >> >> - dictionary: PCollection tag --> PCollection >> - tuple: index --> PCollection >> - DoOutputsTuple: tag, index, or field name --> PCollection >> >> I find this inconsistent way of accessing multiple outputs to be >> confusing. Say that you have an arbitrary PTransform with multiple outputs. >> How do you know how to access an individual output without looking at the >> source code? *You can't!* Remember there are three representations of >> multiple outputs. So, you need to look at the output type and determine >> what the output actually is. >> >> What purpose does it serve to have three different ways of representing a >> single concept of multiple output PCollections? >> >> My proposal is to have a single representation analogous to Java's >> PCollectionTuple. With this new type you will able to access PCollections >> by tag with the "[ ]" operator or by field name. It should also up-convert >> returned tuples, dicts, and DoOutputsTuples from composites into this new >> type. >> >> Full example: >> >> class SomeCustomComposite(PTransform): >> def expand(self, pcoll): >> def my_multi_do_fn(x): >> if isinstance(x, int): >> yield pvalue.TaggedOutput('number', x) >> if isinstance(x, str): >> yield pvalue.TaggedOutput('string', x) >> >> def printer(x): >> print(x) >> yield x >> >> outputs = pcoll | beam.ParDo(my_multi_do_fn).with_outputs()* return >> pvalue.PTuple({ >> 'number': output.number | beam.ParDo(printer), >> 'string': output.string | beam.ParDo(printer) >> })* >> >> p = beam.Pipeline() >> *main = p | SomeCustomComposite()* >> >> # Access PCollection by field name. >> numbers = *main.number* | beam.ParDo(...) >> >> # Access PCollection by tag. >> strings = *main['string']* | beam.ParDo(...) >> >> What do you think? Does this clear up the confusion of using multiple >> output PCollections in Python? >> >> Regards, >> Sam >> >