You could write a DoFn that "consumes" the output of write as a side input and touches the file manually. E.g.
write_result = ... | beam.io.WriteToText(...) p | beam.Create([None]) | beam.Map(lambda unused_none, unused_side: create_file(), unused_side=write_result) where create_file() actually creates the file in question. Though write_result is unused, this will cause the Map to block until it is computed. On Wed, Feb 3, 2021 at 9:15 AM radhika sharma <radhika7...@gmail.com> wrote: > I have created a data flow template as below > > from __future__ import absolute_import > import apache_beam as beam > import argparse > import logging > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.io.gcp.internal.clients import bigquery > from datetime import date > today = date.today() > current_date = today.strftime("%Y%m%d") > def run(argv=None): > parser = argparse.ArgumentParser() > known_args, pipeline_args = parser.parse_known_args(argv) > p = beam.Pipeline(options=PipelineOptions(pipeline_args)) > (p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT > DISTINCT(IF(LENGTH(MOBILE)=10, CONCAT('91',MOBILE),REPLACE(MOBILE,'+91 > ','91'))) FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.C4C_CONSUMER_RAW` > WHERE REGEXP_CONTAINS(REGEXP_REPLACE(Mobile, ' ', > ''),r'^(?:(?:\+|0{0,2})91(\s*[\-]\s*)?|[0]?)?[6789]\d{9}$')",use_standard_sql=True)) > | 'read values' >> beam.Map(lambda x: x.values()) > | 'CSV format' >> beam.Map(lambda row:'|'.join ("WRPOOL|5667788|"+ > str(column) +'|"'+"Hi, This msg is from Whirlpool DL" + '"' for column in > row)) > | 'Write_to_GCS' >> > beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/WHR_MOBILE_CNSNT_REQ'+''+ > str(current_date),file_name_suffix='.csv',header='SENDER_ID|SHORTCODE|MOBILE_NUM|CONSENT_MSG') > p.run().wait_until_finish() > if __name__ == '__main__': > logging.getLogger().setLevel(logging.INFO) > run() > > I need to create an emoty file after csv file is created. Not sure which > option to use. Can some one help?? > > Please help. Its urgent. > > I have tried > beam.Create('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst.done') > to create empty file. > Doesn't work. > >