Hi list,

I'm trying to make a SQL PTransform return the rows as the main output and
the ordered list of columns as a tagged output.

This is what my expand() function looks like:

    def expand(self):
        pcoll_query = pcoll.pipeline | 'Query' >> beam.Create([self.query])
        pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >>
beam.Create([self.s3_prefix])
        pcoll_manifest = (pcoll_query
            | 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn,
self.aws_config, self.delimiter, self.null_string),
s3_prefix=AsSingleton(pcoll_s3_prefix)))
        pcoll_schema = (pcoll_query
            | 'Schema' >> ReadRedshiftQuerySchema(self.dsn))
        tuples = (pcoll_manifest
            | 'GetS3Files' >>
beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config))
            | 'AntiFusion' >> beam.GroupByKey()
            | 'LoadDataFromS3' >> beam.ParDo(self.LoadFromS3(self.aws_config))
            | 'ParseCSV' >>
beam.ParDo(self.CSVLineToTuple(self.delimiter, self.null_string),
columns=AsList(pcoll_schema)))
        return tuples

​
Is there any way for me to add pcoll_schema as a tagged output here, inside
the CSVLineToTuple ParDo, or inside the expand() function itself?

Thanks!

-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to