Hi Sofia,

Thanks for the response. For now, we have decided not to use flex template.
Is there a way to pass environmental variables without using any template?

Thanks & Regards,
Sumit Desai

On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World <[email protected]> wrote:

> Hi
>  My 2 cents. .have u ever considered using flex templates to run your
> pipeline? Then you can pass all your parameters at runtime..
> (Apologies in advance if it does not cover your use case...)
>
> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, <[email protected]>
> wrote:
>
>> Hi all,
>>
>> I have a Python application which is using Apache beam and Dataflow as
>> runner. The application uses a non-public Python package
>> 'uplight-telemetry' which is configured using 'extra_packages' while
>> creating pipeline_options object. This package expects an environmental
>> variable named 'OTEL_SERVICE_NAME' and since this variable is not present
>> in the Dataflow worker, it is resulting in an error during application
>> startup.
>>
>> I am passing this variable using custom pipeline options. Code to create
>> pipeline options is as follows-
>>
>> pipeline_options = ProcessBillRequests.CustomOptions(
>>     project=gcp_project_id,
>>     region="us-east1",
>>     job_name=job_name,
>>     
>> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>     
>> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>     runner='DataflowRunner',
>>     save_main_session=True,
>>     service_account_email= service_account,
>>     subnetwork=os.environ.get(SUBNETWORK_URL),
>>     extra_packages=[uplight_telemetry_tar_file_path],
>>     setup_file=setup_file_path,
>>     OTEL_SERVICE_NAME=otel_service_name,
>>     OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
>>     # Set values for additional custom variables as needed
>> )
>>
>>
>> And the code that executes the pipeline is as follows-
>>
>>
>> result = (
>>         pipeline
>>         | "ReadPendingRecordsFromDB" >> read_from_db
>>         | "Parse input PCollection" >> 
>> beam.Map(ProcessBillRequests.parse_bill_data_requests)
>>         | "Fetch bills " >> 
>> beam.ParDo(ProcessBillRequests.FetchBillInformation())
>> )
>>
>> pipeline.run().wait_until_finish()
>>
>> Is there a way I can set the environmental variables in custom options
>> available in the worker?
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>

Reply via email to