Hi list,
I'm trying to make a SQL PTransform return the rows as the main output and
the ordered list of columns as a tagged output.
This is what my expand() function looks like:
def expand(self):
pcoll_query = pcoll.pipeline | 'Query' >> beam.Create([self.query])
pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >>
beam.Create([self.s3_prefix])
pcoll_manifest = (pcoll_query
| 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn,
self.aws_config, self.delimiter, self.null_string),
s3_prefix=AsSingleton(pcoll_s3_prefix)))
pcoll_schema = (pcoll_query
| 'Schema' >> ReadRedshiftQuerySchema(self.dsn))
tuples = (pcoll_manifest
| 'GetS3Files' >>
beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config))
| 'AntiFusion' >> beam.GroupByKey()
| 'LoadDataFromS3' >> beam.ParDo(self.LoadFromS3(self.aws_config))
| 'ParseCSV' >>
beam.ParDo(self.CSVLineToTuple(self.delimiter, self.null_string),
columns=AsList(pcoll_schema)))
return tuples
Is there any way for me to add pcoll_schema as a tagged output here, inside
the CSVLineToTuple ParDo, or inside the expand() function itself?
Thanks!
--
Best regards,
Dmitry Demeshchuk.