Thanks, Robert!

If I make it a dictionary, can I somehow specify the main (default) label?
Or, if it's a tuple, will the first element be the default one?

On Tue, Jul 11, 2017 at 6:22 PM, Robert Bradshaw <[email protected]>
wrote:

> You could return a (tuples, pcoll_schema) tuple from your expand method.
> Make this a dictionary for more explicit labeling.
>
>
> On Tuesday, July 11, 2017, Dmitry Demeshchuk <[email protected]> wrote:
>
>> Hi list,
>>
>> I'm trying to make a SQL PTransform return the rows as the main output
>> and the ordered list of columns as a tagged output.
>>
>> This is what my expand() function looks like:
>>
>>     def expand(self):
>>         pcoll_query = pcoll.pipeline | 'Query' >> beam.Create([self.query])
>>         pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >> 
>> beam.Create([self.s3_prefix])
>>         pcoll_manifest = (pcoll_query
>>             | 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn, 
>> self.aws_config, self.delimiter, self.null_string), 
>> s3_prefix=AsSingleton(pcoll_s3_prefix)))
>>         pcoll_schema = (pcoll_query
>>             | 'Schema' >> ReadRedshiftQuerySchema(self.dsn))
>>         tuples = (pcoll_manifest
>>             | 'GetS3Files' >> 
>> beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config))
>>             | 'AntiFusion' >> beam.GroupByKey()
>>             | 'LoadDataFromS3' >> 
>> beam.ParDo(self.LoadFromS3(self.aws_config))
>>             | 'ParseCSV' >> beam.ParDo(self.CSVLineToTuple(self.delimiter, 
>> self.null_string), columns=AsList(pcoll_schema)))
>>         return tuples
>>
>> ​
>> Is there any way for me to add pcoll_schema as a tagged output here,
>> inside the CSVLineToTuple ParDo, or inside the expand() function itself?
>>
>> Thanks!
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>


-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to