Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-21 Thread Sumit Desai via user
Thanks Reuven and Alex. Yes, we are considering specifying the max time to
read to the Pub/sub input connector first. If it doesn't work out due to
some reason, will consider the approach with GCS. Thanks for your inputs.

Regards,
Sumit Desai



On Mon, Jan 22, 2024 at 4:13 AM Reuven Lax via user 
wrote:

> Cloud Storage subscriptions are a reasonable way to backup data to
> storage, and you can then run a batch pipeline over the GCS files. Keep in
> mind that these files might contain duplicates (the storage subscriptions
> do not guarantee exactly-once writes). If this is a problem, you should add
> a deduplication stage to the batch job that processes these files.
>
> On Sun, Jan 21, 2024 at 2:45 AM Alex Van Boxel  wrote:
>
>> There are some valid use cases where you want to handle data going over
>> Pubsub to handle in batch. It's way too expensive to run a simple daily
>> extract from the data over Pubsub; batch is a lot cheaper.
>>
>> What we do is backup the data to Cloud Storage; Pubsub has recently added
>> a nice feature that can help you:
>>
>>- https://cloud.google.com/pubsub/docs/cloudstorage
>>-
>>
>> https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription#subscription_properties
>>
>> This reduced our cost dramatically. We had a Dataflow doing the backup to
>> Cloud Storage, but the above feature is way cheaper. Use the export to Avro
>> (the schema is in the second link), and then your batch beam pipeline input
>> is a bounded input.
>>
>>  _/
>> _/ Alex Van Boxel
>>
>>
>> On Fri, Jan 19, 2024 at 12:18 AM Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>> Some comments here:
>>>1. All messages in a PubSub topic is not a well-defined statement, as
>>> there can always be more messages published. You may know that nobody will
>>> publish any more messages, but the pipeline does not.
>>>2. While it's possible to read from Pub/Sub in batch, it's usually
>>> not recommended. For one thing I don't think that the batch runner can
>>> maintain exactly-once processing when reading from Pub/Sub.
>>>3. In Java you can turn an unbounded source (Pub/Sub) into a bounded
>>> source that can in theory be used for batch jobs. However this is done by
>>> specifying either the max time to read or the max number of messages. I
>>> don't think there's any way to automatically read the Pub/Sub topic until
>>> there are no more messages in it.
>>>
>>> Reuven
>>>
>>> On Thu, Jan 18, 2024 at 2:25 AM Sumit Desai via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I want to create a Dataflow pipeline using Pub/sub as an input
>>>> connector but I want to run it in batch mode and not streaming mode. I know
>>>> it's not possible in Python but how can I achieve this in Java? Basically,
>>>> I want my pipeline to read all messages in a Pubsub topic, process and
>>>> terminate. Please suggest.
>>>>
>>>> Thanks & Regards,
>>>> Sumit Desai
>>>>
>>>


Using Dataflow with Pubsub input connector in batch mode

2024-01-18 Thread Sumit Desai via user
Hi all,

I want to create a Dataflow pipeline using Pub/sub as an input connector
but I want to run it in batch mode and not streaming mode. I know it's not
possible in Python but how can I achieve this in Java? Basically, I want my
pipeline to read all messages in a Pubsub topic, process and terminate.
Please suggest.

Thanks & Regards,
Sumit Desai


Re: Environmental variables not accessible in Dataflow pipeline

2024-01-01 Thread Sumit Desai via user
;> not
>>>>>>> serve my purpose.
>>>>>>>
>>>>>>> Thanks & Regards,
>>>>>>> Sumit Desai
>>>>>>>
>>>>>>> On Thu, Dec 21, 2023 at 10:02 AM Sumit Desai <
>>>>>>> sumit.de...@uplight.com> wrote:
>>>>>>>
>>>>>>>> Thank you HQ. Will take a look at this.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Sumit Desai
>>>>>>>>
>>>>>>>> On Wed, Dec 20, 2023 at 8:13 PM XQ Hu  wrote:
>>>>>>>>
>>>>>>>>> Dataflow VMs cannot know your local env variable. I think you
>>>>>>>>> should use custom container:
>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/using-custom-containers.
>>>>>>>>> Here is a sample project:
>>>>>>>>> https://github.com/google/dataflow-ml-starter
>>>>>>>>>
>>>>>>>>> On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Sumit
>>>>>>>>>>  Thanks. Sorry...I guess if the value of the env variable is
>>>>>>>>>> always the same u can pass it as job params?..though it doesn't 
>>>>>>>>>> sound like
>>>>>>>>>> a viable option...
>>>>>>>>>> Hth
>>>>>>>>>>
>>>>>>>>>> On Wed, 20 Dec 2023, 09:49 Sumit Desai, 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Sofia,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the response. For now, we have decided not to use
>>>>>>>>>>> flex template. Is there a way to pass environmental variables 
>>>>>>>>>>> without using
>>>>>>>>>>> any template?
>>>>>>>>>>>
>>>>>>>>>>> Thanks & Regards,
>>>>>>>>>>> Sumit Desai
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World <
>>>>>>>>>>> mmistr...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>  My 2 cents. .have u ever considered using flex templates to
>>>>>>>>>>>> run your pipeline? Then you can pass all your parameters at 
>>>>>>>>>>>> runtime..
>>>>>>>>>>>> (Apologies in advance if it does not cover your use case...)
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, <
>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a Python application which is using Apache beam and
>>>>>>>>>>>>> Dataflow as runner. The application uses a non-public Python 
>>>>>>>>>>>>> package
>>>>>>>>>>>>> 'uplight-telemetry' which is configured using 'extra_packages' 
>>>>>>>>>>>>> while
>>>>>>>>>>>>> creating pipeline_options object. This package expects an 
>>>>>>>>>>>>> environmental
>>>>>>>>>>>>> variable named 'OTEL_SERVICE_NAME' and since this variable is not 
>>>>>>>>>>>>> present
>>>>>>>>>>>>> in the Dataflow worker, it is resulting in an error during 
>>>>>>>>>>>>> application
>>>>>>>>>>>>> startup.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am passing this variable using custom pipeline options. Code
>>>>>>>>>>>>> to create pipeline options is as follows-
>>>>&g

Re: Environmental variables not accessible in Dataflow pipeline

2023-12-22 Thread Sumit Desai via user
Yes, I will have to try it out.

Regards
Sumit Desai

On Fri, Dec 22, 2023 at 3:53 PM Sofia’s World  wrote:

> I guess so, i am not an expert on using env variables in dataflow
> pipelines as any config dependencies i  need, i pass them as job input
> params
>
> But perhaps you can configure variables in your docker file (i am not an
> expert in this either),  as  flex templates use Docker?
>
>
> https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates
>
> hth
>   Marco
>
>
>
>
> On Fri, Dec 22, 2023 at 10:17 AM Sumit Desai 
> wrote:
>
>> We are using an external non-public package which expects environmental
>> variables only. If environmental variables are not found, it will throw an
>> error. We can't change source of this package.
>>
>> Does this mean we will face same problem with flex templates also?
>>
>> On Fri, 22 Dec 2023, 3:39 pm Sofia’s World,  wrote:
>>
>>> The flex template will allow you to pass input params with dynamic
>>> values to your data flow job so you could replace the env variable with
>>> that input? That is, unless you have to have env bars..but from your
>>> snippets it appears you are just using them to configure one of your
>>> components?
>>> Hth
>>>
>>> On Fri, 22 Dec 2023, 10:01 Sumit Desai,  wrote:
>>>
>>>> Hi Sofia and XQ,
>>>>
>>>> The application is failing because I have loggers defined in every file
>>>> and the method to create a logger tries to create an object of
>>>> UplightTelemetry. If I use flex templated, will the environmental variables
>>>> I supply be loaded before the application gets loaded? If not, it would not
>>>> serve my purpose.
>>>>
>>>> Thanks & Regards,
>>>> Sumit Desai
>>>>
>>>> On Thu, Dec 21, 2023 at 10:02 AM Sumit Desai 
>>>> wrote:
>>>>
>>>>> Thank you HQ. Will take a look at this.
>>>>>
>>>>> Regards,
>>>>> Sumit Desai
>>>>>
>>>>> On Wed, Dec 20, 2023 at 8:13 PM XQ Hu  wrote:
>>>>>
>>>>>> Dataflow VMs cannot know your local env variable. I think you should
>>>>>> use custom container:
>>>>>> https://cloud.google.com/dataflow/docs/guides/using-custom-containers.
>>>>>> Here is a sample project:
>>>>>> https://github.com/google/dataflow-ml-starter
>>>>>>
>>>>>> On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World 
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Sumit
>>>>>>>  Thanks. Sorry...I guess if the value of the env variable is always
>>>>>>> the same u can pass it as job params?..though it doesn't sound like a
>>>>>>> viable option...
>>>>>>> Hth
>>>>>>>
>>>>>>> On Wed, 20 Dec 2023, 09:49 Sumit Desai, 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Sofia,
>>>>>>>>
>>>>>>>> Thanks for the response. For now, we have decided not to use flex
>>>>>>>> template. Is there a way to pass environmental variables without using 
>>>>>>>> any
>>>>>>>> template?
>>>>>>>>
>>>>>>>> Thanks & Regards,
>>>>>>>> Sumit Desai
>>>>>>>>
>>>>>>>> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>  My 2 cents. .have u ever considered using flex templates to run
>>>>>>>>> your pipeline? Then you can pass all your parameters at runtime..
>>>>>>>>> (Apologies in advance if it does not cover your use case...)
>>>>>>>>>
>>>>>>>>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I have a Python application which is using Apache beam and
>>>>>>>>>> Dataflow as runner. The application uses a non-public Python package
>>>>>>>>>> 'uplight-telem

Re: Environmental variables not accessible in Dataflow pipeline

2023-12-22 Thread Sumit Desai via user
We are using an external non-public package which expects environmental
variables only. If environmental variables are not found, it will throw an
error. We can't change source of this package.

Does this mean we will face same problem with flex templates also?

On Fri, 22 Dec 2023, 3:39 pm Sofia’s World,  wrote:

> The flex template will allow you to pass input params with dynamic values
> to your data flow job so you could replace the env variable with that
> input? That is, unless you have to have env bars..but from your snippets it
> appears you are just using them to configure one of your components?
> Hth
>
> On Fri, 22 Dec 2023, 10:01 Sumit Desai,  wrote:
>
>> Hi Sofia and XQ,
>>
>> The application is failing because I have loggers defined in every file
>> and the method to create a logger tries to create an object of
>> UplightTelemetry. If I use flex templated, will the environmental variables
>> I supply be loaded before the application gets loaded? If not, it would not
>> serve my purpose.
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>> On Thu, Dec 21, 2023 at 10:02 AM Sumit Desai 
>> wrote:
>>
>>> Thank you HQ. Will take a look at this.
>>>
>>> Regards,
>>> Sumit Desai
>>>
>>> On Wed, Dec 20, 2023 at 8:13 PM XQ Hu  wrote:
>>>
>>>> Dataflow VMs cannot know your local env variable. I think you should
>>>> use custom container:
>>>> https://cloud.google.com/dataflow/docs/guides/using-custom-containers.
>>>> Here is a sample project: https://github.com/google/dataflow-ml-starter
>>>>
>>>> On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World 
>>>> wrote:
>>>>
>>>>> Hello Sumit
>>>>>  Thanks. Sorry...I guess if the value of the env variable is always
>>>>> the same u can pass it as job params?..though it doesn't sound like a
>>>>> viable option...
>>>>> Hth
>>>>>
>>>>> On Wed, 20 Dec 2023, 09:49 Sumit Desai, 
>>>>> wrote:
>>>>>
>>>>>> Hi Sofia,
>>>>>>
>>>>>> Thanks for the response. For now, we have decided not to use flex
>>>>>> template. Is there a way to pass environmental variables without using 
>>>>>> any
>>>>>> template?
>>>>>>
>>>>>> Thanks & Regards,
>>>>>> Sumit Desai
>>>>>>
>>>>>> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>  My 2 cents. .have u ever considered using flex templates to run
>>>>>>> your pipeline? Then you can pass all your parameters at runtime..
>>>>>>> (Apologies in advance if it does not cover your use case...)
>>>>>>>
>>>>>>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have a Python application which is using Apache beam and Dataflow
>>>>>>>> as runner. The application uses a non-public Python package
>>>>>>>> 'uplight-telemetry' which is configured using 'extra_packages' while
>>>>>>>> creating pipeline_options object. This package expects an environmental
>>>>>>>> variable named 'OTEL_SERVICE_NAME' and since this variable is not 
>>>>>>>> present
>>>>>>>> in the Dataflow worker, it is resulting in an error during application
>>>>>>>> startup.
>>>>>>>>
>>>>>>>> I am passing this variable using custom pipeline options. Code to
>>>>>>>> create pipeline options is as follows-
>>>>>>>>
>>>>>>>> pipeline_options = ProcessBillRequests.CustomOptions(
>>>>>>>> project=gcp_project_id,
>>>>>>>> region="us-east1",
>>>>>>>> job_name=job_name,
>>>>>>>> 
>>>>>>>> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>>>>>>> 
>>>>>>>> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>>>>>>>  

Re: Environmental variables not accessible in Dataflow pipeline

2023-12-22 Thread Sumit Desai via user
Hi Sofia and XQ,

The application is failing because I have loggers defined in every file and
the method to create a logger tries to create an object of
UplightTelemetry. If I use flex templated, will the environmental variables
I supply be loaded before the application gets loaded? If not, it would not
serve my purpose.

Thanks & Regards,
Sumit Desai

On Thu, Dec 21, 2023 at 10:02 AM Sumit Desai 
wrote:

> Thank you HQ. Will take a look at this.
>
> Regards,
> Sumit Desai
>
> On Wed, Dec 20, 2023 at 8:13 PM XQ Hu  wrote:
>
>> Dataflow VMs cannot know your local env variable. I think you should use
>> custom container:
>> https://cloud.google.com/dataflow/docs/guides/using-custom-containers.
>> Here is a sample project: https://github.com/google/dataflow-ml-starter
>>
>> On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World 
>> wrote:
>>
>>> Hello Sumit
>>>  Thanks. Sorry...I guess if the value of the env variable is always the
>>> same u can pass it as job params?..though it doesn't sound like a
>>> viable option...
>>> Hth
>>>
>>> On Wed, 20 Dec 2023, 09:49 Sumit Desai,  wrote:
>>>
>>>> Hi Sofia,
>>>>
>>>> Thanks for the response. For now, we have decided not to use flex
>>>> template. Is there a way to pass environmental variables without using any
>>>> template?
>>>>
>>>> Thanks & Regards,
>>>> Sumit Desai
>>>>
>>>> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World 
>>>> wrote:
>>>>
>>>>> Hi
>>>>>  My 2 cents. .have u ever considered using flex templates to run your
>>>>> pipeline? Then you can pass all your parameters at runtime..
>>>>> (Apologies in advance if it does not cover your use case...)
>>>>>
>>>>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have a Python application which is using Apache beam and Dataflow
>>>>>> as runner. The application uses a non-public Python package
>>>>>> 'uplight-telemetry' which is configured using 'extra_packages' while
>>>>>> creating pipeline_options object. This package expects an environmental
>>>>>> variable named 'OTEL_SERVICE_NAME' and since this variable is not present
>>>>>> in the Dataflow worker, it is resulting in an error during application
>>>>>> startup.
>>>>>>
>>>>>> I am passing this variable using custom pipeline options. Code to
>>>>>> create pipeline options is as follows-
>>>>>>
>>>>>> pipeline_options = ProcessBillRequests.CustomOptions(
>>>>>> project=gcp_project_id,
>>>>>> region="us-east1",
>>>>>> job_name=job_name,
>>>>>> 
>>>>>> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>>>>> 
>>>>>> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>>>>> runner='DataflowRunner',
>>>>>> save_main_session=True,
>>>>>> service_account_email= service_account,
>>>>>> subnetwork=os.environ.get(SUBNETWORK_URL),
>>>>>> extra_packages=[uplight_telemetry_tar_file_path],
>>>>>> setup_file=setup_file_path,
>>>>>> OTEL_SERVICE_NAME=otel_service_name,
>>>>>> OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
>>>>>> # Set values for additional custom variables as needed
>>>>>> )
>>>>>>
>>>>>>
>>>>>> And the code that executes the pipeline is as follows-
>>>>>>
>>>>>>
>>>>>> result = (
>>>>>> pipeline
>>>>>> | "ReadPendingRecordsFromDB" >> read_from_db
>>>>>> | "Parse input PCollection" >> 
>>>>>> beam.Map(ProcessBillRequests.parse_bill_data_requests)
>>>>>> | "Fetch bills " >> 
>>>>>> beam.ParDo(ProcessBillRequests.FetchBillInformation())
>>>>>> )
>>>>>>
>>>>>> pipeline.run().wait_until_finish()
>>>>>>
>>>>>> I

Database IAM authentication failing from Google Dataflow instance

2023-12-20 Thread Sumit Desai via user
Hi all,
I have a Python based application that is using Apache beam in batch mode
and Google Dataflow as a worker. Yesterday, I was facing an issue passing
environmental variable  to Dataflow workers. I have temporarily commented
uses of the non.public Python package which required environmental
variables to function.

The first step of my pipeline is to read data from a database table as
input PCollection. The library that I have used as Input connector requires
DB build-in user and password and first step is getting executed
successfully.

Now, in second step, I want to update the DB rows (just 1 right now for
testing) to IN_PROGRESS. Here, I am using an IAM user which I am also using
outside of Dataflow. But  I am getting an error in dataflow pipeline -

*sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to
server at "xx.xx.xxx.xx", port 5432 failed: FATAL: AlloyDB IAM user
authentication failed for user "{iam_user}". *

I also tried creating a new IAM user corresponding to the service account I
am using for workers and provided it with the same permissions as the IAM
user outside of dataflow. But ,I am still seeing the same error. From logs,
I can see DB IP ,DB name and IAM users are correctly being passed.

Is there anything additional that I should be doing for an IAM user to
successfully connect to DB?

Thanks & Regards,
Sumit Desai


Re: Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread Sumit Desai via user
Hi Sofia,

Thanks for the response. For now, we have decided not to use flex template.
Is there a way to pass environmental variables without using any template?

Thanks & Regards,
Sumit Desai

On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World  wrote:

> Hi
>  My 2 cents. .have u ever considered using flex templates to run your
> pipeline? Then you can pass all your parameters at runtime..
> (Apologies in advance if it does not cover your use case...)
>
> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, 
> wrote:
>
>> Hi all,
>>
>> I have a Python application which is using Apache beam and Dataflow as
>> runner. The application uses a non-public Python package
>> 'uplight-telemetry' which is configured using 'extra_packages' while
>> creating pipeline_options object. This package expects an environmental
>> variable named 'OTEL_SERVICE_NAME' and since this variable is not present
>> in the Dataflow worker, it is resulting in an error during application
>> startup.
>>
>> I am passing this variable using custom pipeline options. Code to create
>> pipeline options is as follows-
>>
>> pipeline_options = ProcessBillRequests.CustomOptions(
>> project=gcp_project_id,
>> region="us-east1",
>> job_name=job_name,
>> 
>> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>> 
>> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>> runner='DataflowRunner',
>> save_main_session=True,
>> service_account_email= service_account,
>> subnetwork=os.environ.get(SUBNETWORK_URL),
>> extra_packages=[uplight_telemetry_tar_file_path],
>> setup_file=setup_file_path,
>> OTEL_SERVICE_NAME=otel_service_name,
>> OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
>> # Set values for additional custom variables as needed
>> )
>>
>>
>> And the code that executes the pipeline is as follows-
>>
>>
>> result = (
>> pipeline
>> | "ReadPendingRecordsFromDB" >> read_from_db
>> | "Parse input PCollection" >> 
>> beam.Map(ProcessBillRequests.parse_bill_data_requests)
>> | "Fetch bills " >> 
>> beam.ParDo(ProcessBillRequests.FetchBillInformation())
>> )
>>
>> pipeline.run().wait_until_finish()
>>
>> Is there a way I can set the environmental variables in custom options
>> available in the worker?
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>


Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread Sumit Desai via user
Hi all,

I have a Python application which is using Apache beam and Dataflow as
runner. The application uses a non-public Python package
'uplight-telemetry' which is configured using 'extra_packages' while
creating pipeline_options object. This package expects an environmental
variable named 'OTEL_SERVICE_NAME' and since this variable is not present
in the Dataflow worker, it is resulting in an error during application
startup.

I am passing this variable using custom pipeline options. Code to create
pipeline options is as follows-

pipeline_options = ProcessBillRequests.CustomOptions(
project=gcp_project_id,
region="us-east1",
job_name=job_name,

temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',

staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
runner='DataflowRunner',
save_main_session=True,
service_account_email= service_account,
subnetwork=os.environ.get(SUBNETWORK_URL),
extra_packages=[uplight_telemetry_tar_file_path],
setup_file=setup_file_path,
OTEL_SERVICE_NAME=otel_service_name,
OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
# Set values for additional custom variables as needed
)


And the code that executes the pipeline is as follows-


result = (
pipeline
| "ReadPendingRecordsFromDB" >> read_from_db
| "Parse input PCollection" >>
beam.Map(ProcessBillRequests.parse_bill_data_requests)
| "Fetch bills " >>
beam.ParDo(ProcessBillRequests.FetchBillInformation())
)

pipeline.run().wait_until_finish()

Is there a way I can set the environmental variables in custom options
available in the worker?

Thanks & Regards,
Sumit Desai


Re: Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Sumit Desai via user
Thanks Anand and Robert. Using extra_packages and specifying it as list
worked.

Regards,
Sumit Desai

On Tue, Dec 19, 2023 at 11:45 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> And should it be a list of strings, rather than a string?
>
> On Tue, Dec 19, 2023 at 10:10 AM Anand Inguva via user <
> user@beam.apache.org> wrote:
>
>> Can you try passing `extra_packages` instead of `extra_package` when
>> passing pipeline options as a dict?
>>
>> On Tue, Dec 19, 2023 at 12:26 PM Sumit Desai via user <
>> user@beam.apache.org> wrote:
>>
>>> Hi all,
>>> I have created a Dataflow pipeline in batch mode using Apache beam
>>> Python SDK. I am using one non-public dependency 'uplight-telemetry'. I
>>> have specified it using parameter extra_package while creating
>>> pipeline_options object. However, the pipeline loading is failing with an
>>> error *No module named 'uplight_telemetry'*.
>>> The code to create pipeline_options is as following-
>>>
>>> def __create_pipeline_options_dataflow(job_name):
>>> # Set up the Dataflow runner options
>>> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>>> current_dir = os.path.dirname(os.path.abspath(__file__))
>>> print("current_dir=", current_dir)
>>> setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
>>> print("Set-up file path=", setup_file_path)
>>> #TODO:Move file to proper location
>>> uplight_telemetry_tar_file_path=os.path.join(current_dir, '..', 
>>> '..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
>>> # TODO:Move to environmental variables
>>> pipeline_options = {
>>> 'project': gcp_project_id,
>>> 'region': "us-east1",
>>> 'job_name': job_name,  # Provide a unique job name
>>> 'temp_location': 
>>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>> 'staging_location': 
>>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>> 'runner': 'DataflowRunner',
>>> 'save_main_session': True,
>>> 'service_account_email': os.environ.get(SERVICE_ACCOUNT),
>>> # 'network': f'projects/{gcp_project_id}/global/networks/default',
>>> 'subnetwork': os.environ.get(SUBNETWORK_URL),
>>> 'setup_file': setup_file_path,
>>> 'extra_package': uplight_telemetry_tar_file_path
>>> # 'template_location': 
>>> 'gcr.io/dataflow-templates-base/python310-template-launcher-base'
>>> }
>>> print("Pipeline created for job-name", job_name)
>>> logger.debug(f"pipeline_options created as {pipeline_options}")
>>> return pipeline_options
>>>
>>> Why is it not trying to install this package from extra_package?
>>>
>>


Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Sumit Desai via user
Hi all,
I have created a Dataflow pipeline in batch mode using Apache beam Python
SDK. I am using one non-public dependency 'uplight-telemetry'. I have
specified it using parameter extra_package while creating pipeline_options
object. However, the pipeline loading is failing with an error *No module
named 'uplight_telemetry'*.
The code to create pipeline_options is as following-

def __create_pipeline_options_dataflow(job_name):
# Set up the Dataflow runner options
gcp_project_id = os.environ.get(GCP_PROJECT_ID)
current_dir = os.path.dirname(os.path.abspath(__file__))
print("current_dir=", current_dir)
setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
print("Set-up file path=", setup_file_path)
#TODO:Move file to proper location
uplight_telemetry_tar_file_path=os.path.join(current_dir, '..',
'..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
# TODO:Move to environmental variables
pipeline_options = {
'project': gcp_project_id,
'region': "us-east1",
'job_name': job_name,  # Provide a unique job name
'temp_location':
f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
'staging_location':
f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
'runner': 'DataflowRunner',
'save_main_session': True,
'service_account_email': os.environ.get(SERVICE_ACCOUNT),
# 'network': f'projects/{gcp_project_id}/global/networks/default',
'subnetwork': os.environ.get(SUBNETWORK_URL),
'setup_file': setup_file_path,
'extra_package': uplight_telemetry_tar_file_path
# 'template_location':
'gcr.io/dataflow-templates-base/python310-template-launcher-base'
}
print("Pipeline created for job-name", job_name)
logger.debug(f"pipeline_options created as {pipeline_options}")
return pipeline_options

Why is it not trying to install this package from extra_package?


Re: Specifying dataflow template location with Apache beam Python SDK

2023-12-18 Thread Sumit Desai via user
Thanks all. Yes I was under a misunderstanding that we can directly use one
of these templates as a base without creating a custom template. Thanks for
clarifying it for me.

Regards,
Sumit Desai

On Mon, 18 Dec 2023, 10:34 pm Bruno Volpato via user, 
wrote:

> Right, there's some misunderstanding here, so Bartosz and XQ's inputs are
> correct.
>
> Just want to add that the template_location parameter is the GCS path that
> you want to store your template on, and not the image reference of the base
> image.
> The GCR path that you are trying to use is used in the Dockerfile in case
> you are trying to use a Flex Template (see here:
> https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#use_custom_container_images
> ).
>
> Best,
> Bruno
>
>
>
>
> On Mon, Dec 18, 2023 at 11:39 AM XQ Hu via user 
> wrote:
>
>>
>> https://github.com/google/dataflow-ml-starter/tree/main?tab=readme-ov-file#run-the-beam-pipeline-with-dataflow-flex-templates
>> has a full example about how to create your own flex template. FYI.
>>
>> On Mon, Dec 18, 2023 at 5:01 AM Bartosz Zabłocki via user <
>> user@beam.apache.org> wrote:
>>
>>> Hi Sumit,
>>> could you elaborate a little bit more on what you are trying to achieve
>>> with the templates?
>>>
>>> As far as I know, these base Docker images serve as base images for your
>>> own custom templates.
>>> If you want to use an existing template, you can use one of these:
>>> https://cloud.google.com/dataflow/docs/guides/templates/provided-templates
>>> .
>>> To run it, you just need to invoke `gcloud dataflow jobs run... ` or
>>> equivalent command (
>>> https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-pubsub#gcloud).
>>> Or just use the UI to launch it (Cloud Console -> Dataflow -> Jobs ->
>>> Create Job From Template).
>>>
>>> If you want to create your own template (ie a reusable Dataflow
>>> pipeline) take a look at this page:
>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#create_a_flex_template.
>>> This will let you package your own pipeline as a template. You'll be able
>>> to launch it with the `gcloud dataflow jobs run...` command.
>>> If you want to create a custom container image, which gives you more
>>> control over the environment and dependencies, you can create your own,
>>> custom Docker image. That's where you'll use the base image you mentioned.
>>> See this page for an example:
>>> https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#use_a_custom_container_for_dependencies
>>> .
>>>
>>> I hope this helps, let me know if you have any other questions.
>>>
>>> Cheers,
>>> Bartosz Zablocki
>>>
>>> On Mon, Dec 18, 2023 at 8:36 AM Sumit Desai via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> I am creating an Apache beam pipeline using Python SDK.I want to use
>>>> some standard template of dataflow (this one
>>>> <https://console.cloud.google.com/gcr/images/dataflow-templates-base/global/python310-template-launcher-base?tab=info>).
>>>> But when I am specifying it using 'template_location' key while creating
>>>> pipeline_options object, I am getting an error `FileNotFoundError: [Errno
>>>> 2] No such file or directory: '
>>>> gcr.io/dataflow-templates-base/python310-template-launcher-base'`
>>>> <http://gcr.io/dataflow-templates-base/python310-template-launcher-base'>
>>>>
>>>> I also tried to specify the complete version `
>>>> gcr.io/dataflow-templates-base/python310-template-launcher-base::flex_templates_base_image_release_20231127_RC00`
>>>> <http://gcr.io/dataflow-templates-base/python310-template-launcher-base::flex_templates_base_image_release_20231127_RC00>
>>>> but got the same error. Can someone suggest what I might be doing wrong?
>>>> The code snippet to create pipeline_options is as follows-
>>>>
>>>> def __create_pipeline_options_dataflow(job_name):
>>>>
>>>>
>>>> # Set up the Dataflow runner options
>>>> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>>>> # TODO:Move to environmental variables
>>>> pipeline_options = {
>>>> 'project': gcp_project_id,
>>>> 'region': "us-east1",
>>>> 'job_name': job_name,  # Provide a unique job name
>>>> 'temp_location':
>>>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>>> 'staging_location':
>>>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>>> 'runner': 'DataflowRunner',
>>>> 'save_main_session': True,
>>>> 'service_account_email': service_account,
>>>> # 'network':
>>>> f'projects/{gcp_project_id}/global/networks/default',
>>>> # 'subnetwork':
>>>> f'projects/{gcp_project_id}/regions/us-east1/subnetworks/default'
>>>> 'template_location': '
>>>> gcr.io/dataflow-templates-base/python310-template-launcher-base'
>>>>
>>>> }
>>>> logger.debug(f"pipeline_options created as {pipeline_options}")
>>>> return pipeline_options
>>>>
>>>>
>>>>


Specifying dataflow template location with Apache beam Python SDK

2023-12-17 Thread Sumit Desai via user
I am creating an Apache beam pipeline using Python SDK.I want to use some
standard template of dataflow (this one
).
But when I am specifying it using 'template_location' key while creating
pipeline_options object, I am getting an error `FileNotFoundError: [Errno
2] No such file or directory: '
gcr.io/dataflow-templates-base/python310-template-launcher-base'`

I also tried to specify the complete version `
gcr.io/dataflow-templates-base/python310-template-launcher-base::flex_templates_base_image_release_20231127_RC00`
but got the same error. Can someone suggest what I might be doing wrong?
The code snippet to create pipeline_options is as follows-

def __create_pipeline_options_dataflow(job_name):


# Set up the Dataflow runner options
gcp_project_id = os.environ.get(GCP_PROJECT_ID)
# TODO:Move to environmental variables
pipeline_options = {
'project': gcp_project_id,
'region': "us-east1",
'job_name': job_name,  # Provide a unique job name
'temp_location':
f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
'staging_location':
f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
'runner': 'DataflowRunner',
'save_main_session': True,
'service_account_email': service_account,
# 'network': f'projects/{gcp_project_id}/global/networks/default',
# 'subnetwork':
f'projects/{gcp_project_id}/regions/us-east1/subnetworks/default'
'template_location': '
gcr.io/dataflow-templates-base/python310-template-launcher-base'

}
logger.debug(f"pipeline_options created as {pipeline_options}")
return pipeline_options