My bad, I was under an incorrect impression that if I called something like
results = p | MyPTransform().with_outputs('label1', main='label2')
I could have then called
# "results | Something" magically equals to "results['label2'] | Something".
results | PTransformLabel2Items()
results['label1'] PTransformLabel1Items()
Now, after having re-read the documentation I'm seeing that I'm wrong.
Guess, the dictionary approach will have to do :)
On Wed, Jul 12, 2017 at 12:26 PM, Robert Bradshaw <[email protected]>
wrote:
> If I understand correctly, you want to return something like
> ParDo(...).with_outputs(...)? That is, only return the schema if
> explicitly asked for?
>
> In this case, you could take a boolean parameter to your PTranform
> constructor and your expand method could look lke
>
> def expand(self, p):
> ...
> if self._return_schema
> return tuples, schema
> else:
> return tuples
>
> You would use it like
>
> result = MyPTransform()
>
> or
>
> result, schema = MyPTransform(return_schema=True)
>
>
> On Wed, Jul 12, 2017 at 11:32 AM, Dmitry Demeshchuk
> <[email protected]> wrote:
> > Thanks, Robert!
> >
> > If I make it a dictionary, can I somehow specify the main (default)
> label?
> > Or, if it's a tuple, will the first element be the default one?
> >
> > On Tue, Jul 11, 2017 at 6:22 PM, Robert Bradshaw <[email protected]>
> > wrote:
> >>
> >> You could return a (tuples, pcoll_schema) tuple from your expand method.
> >> Make this a dictionary for more explicit labeling.
> >>
> >>
> >> On Tuesday, July 11, 2017, Dmitry Demeshchuk <[email protected]>
> wrote:
> >>>
> >>> Hi list,
> >>>
> >>> I'm trying to make a SQL PTransform return the rows as the main output
> >>> and the ordered list of columns as a tagged output.
> >>>
> >>> This is what my expand() function looks like:
> >>>
> >>> def expand(self):
> >>> pcoll_query = pcoll.pipeline | 'Query' >>
> >>> beam.Create([self.query])
> >>> pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >>
> >>> beam.Create([self.s3_prefix])
> >>> pcoll_manifest = (pcoll_query
> >>> | 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn,
> >>> self.aws_config, self.delimiter, self.null_string),
> >>> s3_prefix=AsSingleton(pcoll_s3_prefix)))
> >>> pcoll_schema = (pcoll_query
> >>> | 'Schema' >> ReadRedshiftQuerySchema(self.dsn))
> >>> tuples = (pcoll_manifest
> >>> | 'GetS3Files' >>
> >>> beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config))
> >>> | 'AntiFusion' >> beam.GroupByKey()
> >>> | 'LoadDataFromS3' >>
> >>> beam.ParDo(self.LoadFromS3(self.aws_config))
> >>> | 'ParseCSV' >>
> >>> beam.ParDo(self.CSVLineToTuple(self.delimiter, self.null_string),
> >>> columns=AsList(pcoll_schema)))
> >>> return tuples
> >>>
> >>> Is there any way for me to add pcoll_schema as a tagged output here,
> >>> inside the CSVLineToTuple ParDo, or inside the expand() function
> itself?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>> Best regards,
> >>> Dmitry Demeshchuk.
> >
> >
> >
> >
> > --
> > Best regards,
> > Dmitry Demeshchuk.
>
--
Best regards,
Dmitry Demeshchuk.