My bad, I was under an incorrect impression that if I called something like

results = p | MyPTransform().with_outputs('label1', main='label2')

​
I could have then called

# "results | Something" magically equals to "results['label2'] | Something".
results | PTransformLabel2Items()
results['label1'] PTransformLabel1Items()

​
Now, after having re-read the documentation I'm seeing that I'm wrong.
Guess, the dictionary approach will have to do :)

On Wed, Jul 12, 2017 at 12:26 PM, Robert Bradshaw <[email protected]>
wrote:

> If I understand correctly, you want to return something like
> ParDo(...).with_outputs(...)? That is, only return the schema if
> explicitly asked for?
>
> In this case, you could take a boolean parameter to your PTranform
> constructor and your expand method could look lke
>
> def expand(self, p):
>     ...
>     if self._return_schema
>         return tuples, schema
>     else:
>         return tuples
>
> You would use it like
>
>     result = MyPTransform()
>
> or
>
>     result, schema = MyPTransform(return_schema=True)
>
>
> On Wed, Jul 12, 2017 at 11:32 AM, Dmitry Demeshchuk
> <[email protected]> wrote:
> > Thanks, Robert!
> >
> > If I make it a dictionary, can I somehow specify the main (default)
> label?
> > Or, if it's a tuple, will the first element be the default one?
> >
> > On Tue, Jul 11, 2017 at 6:22 PM, Robert Bradshaw <[email protected]>
> > wrote:
> >>
> >> You could return a (tuples, pcoll_schema) tuple from your expand method.
> >> Make this a dictionary for more explicit labeling.
> >>
> >>
> >> On Tuesday, July 11, 2017, Dmitry Demeshchuk <[email protected]>
> wrote:
> >>>
> >>> Hi list,
> >>>
> >>> I'm trying to make a SQL PTransform return the rows as the main output
> >>> and the ordered list of columns as a tagged output.
> >>>
> >>> This is what my expand() function looks like:
> >>>
> >>>     def expand(self):
> >>>         pcoll_query = pcoll.pipeline | 'Query' >>
> >>> beam.Create([self.query])
> >>>         pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >>
> >>> beam.Create([self.s3_prefix])
> >>>         pcoll_manifest = (pcoll_query
> >>>             | 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn,
> >>> self.aws_config, self.delimiter, self.null_string),
> >>> s3_prefix=AsSingleton(pcoll_s3_prefix)))
> >>>         pcoll_schema = (pcoll_query
> >>>             | 'Schema' >> ReadRedshiftQuerySchema(self.dsn))
> >>>         tuples = (pcoll_manifest
> >>>             | 'GetS3Files' >>
> >>> beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config))
> >>>             | 'AntiFusion' >> beam.GroupByKey()
> >>>             | 'LoadDataFromS3' >>
> >>> beam.ParDo(self.LoadFromS3(self.aws_config))
> >>>             | 'ParseCSV' >>
> >>> beam.ParDo(self.CSVLineToTuple(self.delimiter, self.null_string),
> >>> columns=AsList(pcoll_schema)))
> >>>         return tuples
> >>>
> >>> Is there any way for me to add pcoll_schema as a tagged output here,
> >>> inside the CSVLineToTuple ParDo, or inside the expand() function
> itself?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>> Best regards,
> >>> Dmitry Demeshchuk.
> >
> >
> >
> >
> > --
> > Best regards,
> > Dmitry Demeshchuk.
>



-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to