Thanks, Robert! If I make it a dictionary, can I somehow specify the main (default) label? Or, if it's a tuple, will the first element be the default one?
On Tue, Jul 11, 2017 at 6:22 PM, Robert Bradshaw <[email protected]> wrote: > You could return a (tuples, pcoll_schema) tuple from your expand method. > Make this a dictionary for more explicit labeling. > > > On Tuesday, July 11, 2017, Dmitry Demeshchuk <[email protected]> wrote: > >> Hi list, >> >> I'm trying to make a SQL PTransform return the rows as the main output >> and the ordered list of columns as a tagged output. >> >> This is what my expand() function looks like: >> >> def expand(self): >> pcoll_query = pcoll.pipeline | 'Query' >> beam.Create([self.query]) >> pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >> >> beam.Create([self.s3_prefix]) >> pcoll_manifest = (pcoll_query >> | 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn, >> self.aws_config, self.delimiter, self.null_string), >> s3_prefix=AsSingleton(pcoll_s3_prefix))) >> pcoll_schema = (pcoll_query >> | 'Schema' >> ReadRedshiftQuerySchema(self.dsn)) >> tuples = (pcoll_manifest >> | 'GetS3Files' >> >> beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config)) >> | 'AntiFusion' >> beam.GroupByKey() >> | 'LoadDataFromS3' >> >> beam.ParDo(self.LoadFromS3(self.aws_config)) >> | 'ParseCSV' >> beam.ParDo(self.CSVLineToTuple(self.delimiter, >> self.null_string), columns=AsList(pcoll_schema))) >> return tuples >> >> >> Is there any way for me to add pcoll_schema as a tagged output here, >> inside the CSVLineToTuple ParDo, or inside the expand() function itself? >> >> Thanks! >> >> -- >> Best regards, >> Dmitry Demeshchuk. >> > -- Best regards, Dmitry Demeshchuk.
