ValueProviderOptions and templates

2020-06-16 Thread Marco Mistroni
HI all
 i am creating dataflow jobs using python API by creating templates which i
then run on gcp.
So suppose my dataflow job accepts 2 input parameter which i need to supply
at invocation time.
Do i need to specify these parameters when i create my template?
Here' s a sample. suppose i need two parameters
--year   and  --key
Which of the two is the correct syntax for creating a template for the job?

python -m my_main  --runner=dataflow --project=xxx-projects
 --template_location=gs://mm_dataflow_bucket/templates/mytemplate
--temp_location=gs://mm_dataflow_bucket/temp
--staging_location=gs://mm_dataflow_bucket/staging  --setup ./setup.py

OR
python -m my_main  --runner=dataflow --project=xxx-projects
 --template_location=gs://mm_dataflow_bucket/templates/mytemplate
--temp_location=gs://mm_dataflow_bucket/temp
--staging_location=gs://mm_dataflow_bucket/staging  --setup ./setup.py
--year  --key

My hope with the latter is that the template 'sees' the option and replace
it with the correct values when i actually invoke the template
regards
 Marco


Re: Beam pipeline parallel steps

2020-05-02 Thread Marco Mistroni
HI
 thanks and in the example above, what if i want to combine output1 and
output2 in a unique dta structure that i can then write to teh same file in
a bucket?
is there some sort of aggregator in Beam?>
thanks

On Wed, Apr 29, 2020 at 5:56 PM André Rocha Silva <
a.si...@portaltelemedicina.com.br> wrote:

> Hey
>
> You simply use the output PCollection from one to many pipes as you want.
> E.g.:
> p = beam.Pipeline(options=pipeline_options)
>
> data = (
> p
> | 'Get data' >> beam.io.ReadFromText(user_options.input_file)
> )
>
> output1 = (
> data
> | 'Transform 1' >> beam.ParDo(trasnf1())
> | 'Write transform 1 results' >> beam.io.WriteToText(out1)
> )
>
> output2 = (
> data
> | 'Transform 2' >> beam.ParDo(trasnf2())
> | 'Another one' >> beam.FlatMap(something, user_options.parameter)
> | 'Write transform 2 results' >> beam.io.WriteToText(out2)
> )
>
> p.run()
>
>
> On Wed, Apr 29, 2020 at 1:19 PM Marco Mistroni 
> wrote:
>
>> Hi all
>>  Is it possible in beam to create a pipeline where two tasks can run in
>> parallel as opposed to sequential,?
>> Simple usecase would be step 3 will generate some data out of which I
>> generate eg 3 completely different outcomes. ( Eg 3 different files stored
>> in a bucket)
>> Thanks
>>  Marco
>>
>
>
> --
>
>*ANDRÉ ROCHA SILVA*
>   * DATA ENGINEER*
>   (48) 3181-0611
>
>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
> <http://portaltelemedicina.com.br/>
> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
> <https://pt-br.facebook.com/PortalTelemedicina/>
> <https://www.linkedin.com/company/9426084/>
>
>


Re: Beam pipeline parallel steps

2020-04-29 Thread Marco Mistroni
Many thanks

On Wed, Apr 29, 2020, 5:56 PM André Rocha Silva <
a.si...@portaltelemedicina.com.br> wrote:

> Hey
>
> You simply use the output PCollection from one to many pipes as you want.
> E.g.:
> p = beam.Pipeline(options=pipeline_options)
>
> data = (
> p
> | 'Get data' >> beam.io.ReadFromText(user_options.input_file)
> )
>
> output1 = (
> data
> | 'Transform 1' >> beam.ParDo(trasnf1())
> | 'Write transform 1 results' >> beam.io.WriteToText(out1)
> )
>
> output2 = (
> data
> | 'Transform 2' >> beam.ParDo(trasnf2())
> | 'Another one' >> beam.FlatMap(something, user_options.parameter)
> | 'Write transform 2 results' >> beam.io.WriteToText(out2)
> )
>
> p.run()
>
>
> On Wed, Apr 29, 2020 at 1:19 PM Marco Mistroni 
> wrote:
>
>> Hi all
>>  Is it possible in beam to create a pipeline where two tasks can run in
>> parallel as opposed to sequential,?
>> Simple usecase would be step 3 will generate some data out of which I
>> generate eg 3 completely different outcomes. ( Eg 3 different files stored
>> in a bucket)
>> Thanks
>>  Marco
>>
>
>
> --
>
>*ANDRÉ ROCHA SILVA*
>   * DATA ENGINEER*
>   (48) 3181-0611
>
>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
> <http://portaltelemedicina.com.br/>
> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
> <https://pt-br.facebook.com/PortalTelemedicina/>
> <https://www.linkedin.com/company/9426084/>
>
>


Beam pipeline parallel steps

2020-04-29 Thread Marco Mistroni
Hi all
 Is it possible in beam to create a pipeline where two tasks can run in
parallel as opposed to sequential,?
Simple usecase would be step 3 will generate some data out of which I
generate eg 3 completely different outcomes. ( Eg 3 different files stored
in a bucket)
Thanks
 Marco


Sending email from Apache Beam

2020-04-15 Thread Marco Mistroni
HI all
  i am planning to end my Apache Beam workflow by sending an email..
so basically i have N elements that gets processed and transformed, and at
the end i would like to send an email whose content are the elements i have
processed.
Could anyone advise on how to do it? and if it is a  malpractice?
At the moment i am stuck at the point where i want to combine all the
elements in one big text, and i need a Transform that given N element it
will just return One big text, similar to a python's reduce function

Is there such a Transform in Beam?

thanks
 Marco


Re: Apache Dataflow Template (Python)/ partially OT

2020-04-08 Thread Marco Mistroni
Hi all
 Was wondering if anyone has experience similar
I kicked off 3 dataflow template s via cloud function. It has created 3 VM
which are still alive after jobs completed and I cannot delete them
Could anyone assist with this?
Kind regards

On Mon, Apr 6, 2020, 3:00 PM Marco Mistroni  wrote:

> Hey
>  Thanks I create template from CMD line...was having issues with CLF but I
> think I was not using Auth correctly
> Will try your sample and report back if I am stuck
> Thanks a lot!
>
> On Mon, Apr 6, 2020, 2:20 PM André Rocha Silva <
> a.si...@portaltelemedicina.com.br> wrote:
>
>> Could you create the template already?
>>
>> Have you read the article? There I write the cloud function in js. Here
>> is some example of a cloud function in python:
>>
>> import google.auth
>> import random
>> import logging
>>
>> from googleapiclient.discovery import build
>>
>> GCLOUD_PROJECT = 'project-id-123'
>>
>>
>> def RunDataflow(event, context):
>>
>> credentials, _ = google.auth.default()
>>
>> service = build('dataflow', 'v1b3', credentials=credentials)
>>
>> uri = 'gs://bucket/input/file'
>> output_file = 'gs://bucket/output/file'
>>
>> template_path = 'gs://bucket/Dataflow_templates/template'
>> template_body = {
>> 'jobName': ('cf-job-' + str(random.randint(1, 101000))),
>> 'parameters': {
>> 'input_file': uri,
>> 'output_file': output_file,
>> },
>> }
>>
>> request = service.projects().templates().launch(
>> projectId=GCLOUD_PROJECT,
>> gcsPath=template_path,
>> body=template_body)
>> response = request.execute()
>>
>> logging.info(f'RunDataflow: got this response {response}')
>>
>>
>> On Mon, Apr 6, 2020 at 10:13 AM Marco Mistroni 
>> wrote:
>>
>>> @andre sorry to hijack this. Are you able to send a working example of
>>> kicking off dataflow  template via cloud function?
>>>
>>> Kind regards
>>>
>>> On Mon, Apr 6, 2020, 1:51 PM André Rocha Silva <
>>> a.si...@portaltelemedicina.com.br> wrote:
>>>
>>>> Hey!
>>>>
>>>> Could you make it work? You can take a look in this post, is a
>>>> single file template, easy peasy to create a template from:
>>>>
>>>> https://towardsdatascience.com/my-first-etl-job-google-cloud-dataflow-1fd773afa955
>>>>
>>>> If you want, we can schedule a google hangout and I help you, step by
>>>> step.
>>>> It is the least I can do after having had so much help from the
>>>> community :)
>>>>
>>>> On Sat, Apr 4, 2020 at 4:52 PM Marco Mistroni 
>>>> wrote:
>>>>
>>>>> Hey
>>>>>  sure... it's  a crap script :).. just an ordinary dataflow script
>>>>>
>>>>>
>>>>> https://github.com/mmistroni/GCP_Experiments/tree/master/dataflow/edgar_flow
>>>>>
>>>>>
>>>>> What i meant to say , for your template question, is for you to write
>>>>> a basic script which run on bean... something as simple as this
>>>>>
>>>>>
>>>>> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/beam_test.py
>>>>>
>>>>> and then you can create a template out of it by just running this
>>>>>
>>>>> python -m edgar_main  --runner=dataflow --project=datascience-projets
>>>>> --template_location=gs://mm_dataflow_bucket/templates/edgar_dataflow_template
>>>>> --temp_location=gs://mm_dataflow_bucket/temp
>>>>> --staging_location=gs://mm_dataflow_bucket/staging
>>>>>
>>>>> That will create a template 'edgar_dataflow_template' which you can
>>>>> use in GCP dataflow console to create your job.
>>>>>
>>>>> hth, i m sort of a noob to Beam, having started writing code just over
>>>>> a month ago. Feel free to ping me if u get stuck
>>>>>
>>>>> kind regards
>>>>>  Marco
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Apr 4, 2020 at 6:01 PM Xander Song 
>>>>> wrote:
>>>>>
>>>>>> Hi Marco,
>>>>>>
>>>>>> Thanks for your response. Would you mind sending the e

Re: Scheduling dataflow pipelines

2020-04-06 Thread Marco Mistroni
Tx Andre. Have skipped pubsub for a simple CLF invoked via CLF sched
Thx for asdist


On Mon, Apr 6, 2020, 5:52 PM André Rocha Silva <
a.si...@portaltelemedicina.com.br> wrote:

> Marco
>
> If I'd give a step by step I'd go:
> 1) test the template on dataflow
> 2) test the cloud function
> 3) call the cloud function from a Pub/sub
> 4) send a message to pub/sub from scheduler
>
> take a look on this tutorial about scheduler:
> https://www.youtube.com/watch?v=WUPEUjvSBW8
>
> I think cloud composer is way too expensive, if you wanna call the
> template twice a day e.g.
>
> kind regards
>
> On Mon, Apr 6, 2020 at 11:45 AM Marco Mistroni 
> wrote:
>
>> Thanks will give it a go
>>
>> On Mon, Apr 6, 2020, 3:39 PM Soliman ElSaber 
>> wrote:
>>
>>> We are using Composer (Airflow) to schedule and run the Dataflow jobs...
>>> Using the Python SDK, with small changes no the Composer (Airflow)
>>> DataFlowPythonOperator, to force it to use Python 3...
>>> It is working fine and creating a new Dataflow job every 30 minutes...
>>>
>>> On Mon, Apr 6, 2020 at 10:33 PM Marco Mistroni 
>>> wrote:
>>>
>>>> Right.. tx Andre. So presumably the flow of action will b
>>>> - create dflow template
>>>> -create CLF that invokes it
>>>> - create cold scheduler job that invokes function?
>>>>
>>>> Kind regards
>>>>
>>>> On Mon, Apr 6, 2020, 2:14 PM André Rocha Silva <
>>>> a.si...@portaltelemedicina.com.br> wrote:
>>>>
>>>>> Marco
>>>>>
>>>>> If you are already using GCP, I suggest you use the cloud scheduler.
>>>>> It is like a cron job completely serverless.
>>>>>
>>>>> If you need some extra help, let me know.
>>>>>
>>>>> On Mon, Apr 6, 2020 at 4:38 AM deepak kumar  wrote:
>>>>>
>>>>>> We have used composer (airlfow) successfully to schedule Dataflow
>>>>>> jobs.
>>>>>> Please let me know if you would need details around it.
>>>>>>
>>>>>> Thanks
>>>>>> Deepak
>>>>>>
>>>>>> On Sun, Apr 5, 2020 at 7:56 PM Joshua B. Harrison <
>>>>>> josh.harri...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Marco,
>>>>>>>
>>>>>>> I've ended using a VM running Luigi to schedule jobs. I use the data
>>>>>>> flow Python API to execute stored templates.
>>>>>>>
>>>>>>> I can give you more details if you’re interested.
>>>>>>>
>>>>>>> Best,
>>>>>>> Joshua
>>>>>>>
>>>>>>> On Sun, Apr 5, 2020 at 5:02 AM Marco Mistroni 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> HI all
>>>>>>>>  sorry for this partially OT but has anyone been successful in
>>>>>>>> scheduling dataflow job on GCP?
>>>>>>>> I have tried the CloudFunction approach (following few eamples on
>>>>>>>> the web) but it didnt work out for me - the cloud function keep on 
>>>>>>>> giving
>>>>>>>> me an INVALID ARGUMENT - which i could not debug
>>>>>>>>
>>>>>>>> So i was wondering if anyone has  been successful and can provide
>>>>>>>> me an example
>>>>>>>>
>>>>>>>> kind regards
>>>>>>>>  Marco
>>>>>>>>
>>>>>>>> --
>>>>>>> Joshua Harrison |  Software Engineer |  joshharri...@gmail.com
>>>>>>>  |  404-433-0242
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>*ANDRÉ ROCHA SILVA*
>>>>>   * DATA ENGINEER*
>>>>>   (48) 3181-0611
>>>>>
>>>>>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
>>>>> <http://portaltelemedicina.com.br/>
>>>>> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
>>>>> <https://pt-br.facebook.com/PortalTelemedicina/>
>>>>> <https://www.linkedin.com/company/9426084/>
>>>>>
>>>>>
>>>
>>> --
>>> Soliman ElSaber
>>> Data Engineer
>>> www.mindvalley.com
>>>
>>
>
> --
>
>*ANDRÉ ROCHA SILVA*
>   * DATA ENGINEER*
>   (48) 3181-0611
>
>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
> <http://portaltelemedicina.com.br/>
> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
> <https://pt-br.facebook.com/PortalTelemedicina/>
> <https://www.linkedin.com/company/9426084/>
>
>


Re: Scheduling dataflow pipelines

2020-04-06 Thread Marco Mistroni
Thanks will give it a go

On Mon, Apr 6, 2020, 3:39 PM Soliman ElSaber  wrote:

> We are using Composer (Airflow) to schedule and run the Dataflow jobs...
> Using the Python SDK, with small changes no the Composer (Airflow)
> DataFlowPythonOperator, to force it to use Python 3...
> It is working fine and creating a new Dataflow job every 30 minutes...
>
> On Mon, Apr 6, 2020 at 10:33 PM Marco Mistroni 
> wrote:
>
>> Right.. tx Andre. So presumably the flow of action will b
>> - create dflow template
>> -create CLF that invokes it
>> - create cold scheduler job that invokes function?
>>
>> Kind regards
>>
>> On Mon, Apr 6, 2020, 2:14 PM André Rocha Silva <
>> a.si...@portaltelemedicina.com.br> wrote:
>>
>>> Marco
>>>
>>> If you are already using GCP, I suggest you use the cloud scheduler. It
>>> is like a cron job completely serverless.
>>>
>>> If you need some extra help, let me know.
>>>
>>> On Mon, Apr 6, 2020 at 4:38 AM deepak kumar  wrote:
>>>
>>>> We have used composer (airlfow) successfully to schedule Dataflow jobs.
>>>> Please let me know if you would need details around it.
>>>>
>>>> Thanks
>>>> Deepak
>>>>
>>>> On Sun, Apr 5, 2020 at 7:56 PM Joshua B. Harrison <
>>>> josh.harri...@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> I've ended using a VM running Luigi to schedule jobs. I use the data
>>>>> flow Python API to execute stored templates.
>>>>>
>>>>> I can give you more details if you’re interested.
>>>>>
>>>>> Best,
>>>>> Joshua
>>>>>
>>>>> On Sun, Apr 5, 2020 at 5:02 AM Marco Mistroni 
>>>>> wrote:
>>>>>
>>>>>> HI all
>>>>>>  sorry for this partially OT but has anyone been successful in
>>>>>> scheduling dataflow job on GCP?
>>>>>> I have tried the CloudFunction approach (following few eamples on the
>>>>>> web) but it didnt work out for me - the cloud function keep on giving me 
>>>>>> an
>>>>>> INVALID ARGUMENT - which i could not debug
>>>>>>
>>>>>> So i was wondering if anyone has  been successful and can provide me
>>>>>> an example
>>>>>>
>>>>>> kind regards
>>>>>>  Marco
>>>>>>
>>>>>> --
>>>>> Joshua Harrison |  Software Engineer |  joshharri...@gmail.com
>>>>>  |  404-433-0242
>>>>>
>>>>
>>>
>>> --
>>>
>>>*ANDRÉ ROCHA SILVA*
>>>   * DATA ENGINEER*
>>>   (48) 3181-0611
>>>
>>>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
>>> <http://portaltelemedicina.com.br/>
>>> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
>>> <https://pt-br.facebook.com/PortalTelemedicina/>
>>> <https://www.linkedin.com/company/9426084/>
>>>
>>>
>
> --
> Soliman ElSaber
> Data Engineer
> www.mindvalley.com
>


Re: Scheduling dataflow pipelines

2020-04-06 Thread Marco Mistroni
Right.. tx Andre. So presumably the flow of action will b
- create dflow template
-create CLF that invokes it
- create cold scheduler job that invokes function?

Kind regards

On Mon, Apr 6, 2020, 2:14 PM André Rocha Silva <
a.si...@portaltelemedicina.com.br> wrote:

> Marco
>
> If you are already using GCP, I suggest you use the cloud scheduler. It is
> like a cron job completely serverless.
>
> If you need some extra help, let me know.
>
> On Mon, Apr 6, 2020 at 4:38 AM deepak kumar  wrote:
>
>> We have used composer (airlfow) successfully to schedule Dataflow jobs.
>> Please let me know if you would need details around it.
>>
>> Thanks
>> Deepak
>>
>> On Sun, Apr 5, 2020 at 7:56 PM Joshua B. Harrison <
>> josh.harri...@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> I've ended using a VM running Luigi to schedule jobs. I use the data
>>> flow Python API to execute stored templates.
>>>
>>> I can give you more details if you’re interested.
>>>
>>> Best,
>>> Joshua
>>>
>>> On Sun, Apr 5, 2020 at 5:02 AM Marco Mistroni 
>>> wrote:
>>>
>>>> HI all
>>>>  sorry for this partially OT but has anyone been successful in
>>>> scheduling dataflow job on GCP?
>>>> I have tried the CloudFunction approach (following few eamples on the
>>>> web) but it didnt work out for me - the cloud function keep on giving me an
>>>> INVALID ARGUMENT - which i could not debug
>>>>
>>>> So i was wondering if anyone has  been successful and can provide me an
>>>> example
>>>>
>>>> kind regards
>>>>  Marco
>>>>
>>>> --
>>> Joshua Harrison |  Software Engineer |  joshharri...@gmail.com
>>>  |  404-433-0242
>>>
>>
>
> --
>
>*ANDRÉ ROCHA SILVA*
>   * DATA ENGINEER*
>   (48) 3181-0611
>
>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
> <http://portaltelemedicina.com.br/>
> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
> <https://pt-br.facebook.com/PortalTelemedicina/>
> <https://www.linkedin.com/company/9426084/>
>
>


Scheduling dataflow pipelines

2020-04-05 Thread Marco Mistroni
HI all
 sorry for this partially OT but has anyone been successful in scheduling
dataflow job on GCP?
I have tried the CloudFunction approach (following few eamples on the web)
but it didnt work out for me - the cloud function keep on giving me an
INVALID ARGUMENT - which i could not debug

So i was wondering if anyone has  been successful and can provide me an
example

kind regards
 Marco


Re: Apache Dataflow Template (Python)

2020-04-04 Thread Marco Mistroni
Hey
 sure... it's  a crap script :).. just an ordinary dataflow script

https://github.com/mmistroni/GCP_Experiments/tree/master/dataflow/edgar_flow


What i meant to say , for your template question, is for you to write a
basic script which run on bean... something as simple as this

https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/beam_test.py

and then you can create a template out of it by just running this

python -m edgar_main  --runner=dataflow --project=datascience-projets
--template_location=gs://mm_dataflow_bucket/templates/edgar_dataflow_template
--temp_location=gs://mm_dataflow_bucket/temp
--staging_location=gs://mm_dataflow_bucket/staging

That will create a template 'edgar_dataflow_template' which you can use in
GCP dataflow console to create your job.

hth, i m sort of a noob to Beam, having started writing code just over a
month ago. Feel free to ping me if u get stuck

kind regards
 Marco












On Sat, Apr 4, 2020 at 6:01 PM Xander Song  wrote:

> Hi Marco,
>
> Thanks for your response. Would you mind sending the edgar_main script so
> I can take a look?
>
> On Sat, Apr 4, 2020 at 2:25 AM Marco Mistroni  wrote:
>
>> Hey
>>  As far as I know you can generate a dataflow template out of your beam
>> code by specifying an option on command line?
>> I am running this CMD and once template is generated I kick off a dflow
>> job via console by pointing at it
>>
>> python -m edgar_main --runner=dataflow --project=datascience-projets
>> --template_location=gs:// Hth
>>
>>
>> On Sat, Apr 4, 2020, 9:52 AM Xander Song  wrote:
>>
>>> I am attempting to write a custom Dataflow Template using the Apache
>>> Beam Python SDK, but am finding the documentation difficult to follow. Does
>>> anyone have a minimal working example of how to write and deploy such a
>>> template?
>>>
>>> Thanks in advance.
>>>
>>


Re: Apache Dataflow Template (Python)

2020-04-04 Thread Marco Mistroni
Hey
 As far as I know you can generate a dataflow template out of your beam
code by specifying an option on command line?
I am running this CMD and once template is generated I kick off a dflow job
via console by pointing at it

python -m edgar_main --runner=dataflow --project=datascience-projets
--template_location=gs:// Hth


On Sat, Apr 4, 2020, 9:52 AM Xander Song  wrote:

> I am attempting to write a custom Dataflow Template using the Apache Beam
> Python SDK, but am finding the documentation difficult to follow. Does
> anyone have a minimal working example of how to write and deploy such a
> template?
>
> Thanks in advance.
>


beam main file with dependencies

2020-01-16 Thread Marco Mistroni
Hello all
 i have written an apache beam workflow which i have splitted across two
file
- main_file.py  contains the pipeline
- utils.py (which contains few functions used in the pipeline)

I have created template  for this using the command below

python -m main_file.py --runner=dataflow --project=myproject
--template_location=gs://mybucket/my_template
--temp_location=gs://mybucket/temp --staging_location=gs://mybucket/staging

and i have attempted to create a job using this template.
However, when i kick off the job i am getting exceptions such as


Traceback (most recent call last): File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 261, in loads return dill.loads(s) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in loads
return load(file, ignore) File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in load
obj = pik.load() File
"/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in
find_class return StockUnpickler.find_class(self, module, name)
ImportError: No module named 'utils'
I am guessing i am missign some steps in packaging the application, or
perhaps some extra options to specify dependencies?
i would not imagine writing a whole workflow in one file, so this looks
like a standard usecase ?

kind regards


Missing permissions when running beam on Dataflow Runner

2020-01-16 Thread Marco Mistroni
hi all
 apologies for this basic question...
i have a simple workflow that i have been running successfully using
DirectRunner
I tried to switch to DataflowRunner and i am getting back this exception
I have kicked off the job in a compute Engine part of my project.
Would anyone know which permission do i need to add and w here?
kind regards

"message": "(98fd51fd0ab0a40d): Could not create workflow; user does not
have write access to project: xxproject Causes: (98fd51fd0ab0a0f4):
Permission 'dataflow.jobs.create' denied on project: 'xxxproject'",


Re: Please assist; how do i use a Sample transform ?

2019-12-17 Thread Marco Mistroni
Many thanks for your help..

On Wed, Dec 18, 2019, 12:45 AM Kyle Weaver  wrote:

> We could make the Sample class uninstantiable to give a slightly more
> specific error here. Not sure how much that would help though.
>
> On Tue, Dec 17, 2019 at 4:40 PM Robert Bradshaw 
> wrote:
>
>> beam.transforms.combiners.Sample is a container class that hails back
>> to the days when folks more familiar with Java were just copying
>> things over, and is just an empty class containing actual transforms
>> (as Kyle indicates). These are shorthand for
>> beam.CombineGlobally(beam.transforms.combiners.SampleCombineFn(...)),
>> beam.CombinePerKey(beam.transforms.combiners.SampleCombineFn(...)), ,
>> etc.
>>
>> On Tue, Dec 17, 2019 at 2:10 PM Kyle Weaver  wrote:
>> >
>> > Looks like you need to choose a subclass of sample. Probably
>> FixedSizeGlobally in your case. For example,
>> >
>> > beam.transforms.combiners.Sample.FixedSizeGlobally(5)
>> >
>> > Source:
>> https://github.com/apache/beam/blob/df376164fee1a8f54f3ad00c45190b813ffbdd34/sdks/python/apache_beam/transforms/combiners.py#L619
>> >
>> > On Tue, Dec 17, 2019 at 2:01 PM Marco Mistroni 
>> wrote:
>> >>
>> >> HI all
>> >> beam noob.
>> >>
>> >>  i have written a beam app where i am processing content of a file
>> >> for dbeugging purposes, i wanted to get a samle of the lines in the
>> file..using
>> >> the Sample combiner, but i cannot find any examples in python
>> >> Here's my rough code
>> >>
>> >> ...
>> >> | 'Filter only row longer than 100 chars' >> beam.Filter(lambda row:
>> len(row) > 100)
>> >> | 'sampling lines' >> beam.transforms.combiners.Sample()
>> >>
>> >> but the code above gives me
>> >>
>> >> TypeError: unsupported operand type(s) for >>: 'str' and 'Sample'
>> >> Could anyone help?
>> >> kind regards
>> >> Marco
>> >>
>> >>
>> >>
>>
>


Please assist; how do i use a Sample transform ?

2019-12-17 Thread Marco Mistroni
HI all
beam noob.

 i have written a beam app where i am processing content of a file
for dbeugging purposes, i wanted to get a samle of the lines in the
file..using
the Sample combiner, but i cannot find any examples in python
Here's my rough code

...
| 'Filter only row longer than 100 chars' >> beam.Filter(lambda row: len(row)
> 100)
| 'sampling lines' >> beam.transforms.combiners.Sample()

but the code above gives me

TypeError: unsupported operand type(s) for >>: 'str' and 'Sample'
Could anyone help?
kind regards
Marco


Is the following workflow suitable for apache beam

2019-11-28 Thread Marco Mistroni
Hi all
 i am currently getting acquainted with Apache beam to replace my
current workflow, and was wondering if Beam can handle it.
Currently, my workflow is based entirely on python asyncio plus some
groupby operations, and it consists of the following

- have a list of remote directories from which i need to download a file -
file has same name across directories
- for each of the file above, i need to scan the content (which is itself a
list of remote file paths)
- for each of the file paths above i need to extract the content to a list
of string
- i need to do a reducebYkey operation out of all the lists extracted above

To me, it seems suitable... the only thing that concerns me is that i
probably have to drop asyncio
Could anyone advise?

kind regards
 Marco