Re: Apache Dataflow Template (Python)

2020-04-04 Thread Xander Song
Hi Marco,

Thanks for your response. Would you mind sending the edgar_main script so I
can take a look?

On Sat, Apr 4, 2020 at 2:25 AM Marco Mistroni  wrote:

> Hey
>  As far as I know you can generate a dataflow template out of your beam
> code by specifying an option on command line?
> I am running this CMD and once template is generated I kick off a dflow
> job via console by pointing at it
>
> python -m edgar_main --runner=dataflow --project=datascience-projets
> --template_location=gs:// Hth
>
>
> On Sat, Apr 4, 2020, 9:52 AM Xander Song  wrote:
>
>> I am attempting to write a custom Dataflow Template using the Apache Beam
>> Python SDK, but am finding the documentation difficult to follow. Does
>> anyone have a minimal working example of how to write and deploy such a
>> template?
>>
>> Thanks in advance.
>>
>


Apache Dataflow Template (Python)

2020-04-04 Thread Xander Song
I am attempting to write a custom Dataflow Template using the Apache Beam
Python SDK, but am finding the documentation difficult to follow. Does
anyone have a minimal working example of how to write and deploy such a
template?

Thanks in advance.


Re: Extract Multiple Features in a Flexible and Extensible Way

2020-02-20 Thread Xander Song
I realized that you can just use normal Python control flow while
constructing your pipeline. No additional Beam functionality necessary.

On Thu, Feb 20, 2020 at 2:14 PM Xander Song  wrote:

> I have written a feature extraction pipeline in which I extract two
> features using ParDos and combine the results with a CoGroupByKey.
>
> with beam.Pipeline() as p:
>
> input = p | 'read input' >> beam.io.ReadFromText(input_path)
>
> first_feature = input | 'extract first feature' >>
> beam.ParDo(ExtractFirstFeatureFn())
>
> second_feature = input | 'extract second feature' >>
> beam.ParDo(ExtractSecondFeatureFn())
>
> combined = {'first feature': first_feature, 'second feature':
> second_feature} | 'combine' >> beam.CoGroupByKey()
>
>
>
> I'd like to extend the pipeline to extract an arbitrary number of features
> while still aggregating them at the end with a CoGroupByKey. I'd also like
> to be able to decide at runtime (via command line arguments or a
> configuration file) which features will be extracted (e.g., extract
> features 1 and 3, but not feature 2). How could I write such a pipeline?
>
>
> Thanks in advance,
>
> - Xander
>


Extract Multiple Features in a Flexible and Extensible Way

2020-02-20 Thread Xander Song
I have written a feature extraction pipeline in which I extract two
features using ParDos and combine the results with a CoGroupByKey.

with beam.Pipeline() as p:

input = p | 'read input' >> beam.io.ReadFromText(input_path)

first_feature = input | 'extract first feature' >>
beam.ParDo(ExtractFirstFeatureFn())

second_feature = input | 'extract second feature' >>
beam.ParDo(ExtractSecondFeatureFn())

combined = {'first feature': first_feature, 'second feature':
second_feature} | 'combine' >> beam.CoGroupByKey()



I'd like to extend the pipeline to extract an arbitrary number of features
while still aggregating them at the end with a CoGroupByKey. I'd also like
to be able to decide at runtime (via command line arguments or a
configuration file) which features will be extracted (e.g., extract
features 1 and 3, but not feature 2). How could I write such a pipeline?


Thanks in advance,

- Xander


Re: Running Beam on Flink

2020-02-08 Thread Xander Song
Do you have any suggestions for addressing this issue? I am unsure of what
to try next.

On Fri, Feb 7, 2020 at 5:55 PM Ankur Goenka  wrote:

> That seems to be a problem.
>
> When I try the command, I get
>
> $ telnet localhost 8099
> Trying ::1...
> Connected to localhost.
> Escape character is '^]'.
> �^CConnection closed by foreign host.
>
> On Fri, Feb 7, 2020 at 5:34 PM Xander Song  wrote:
>
>> Thanks for the response. After entering telnet localhost 8099, I receive
>>
>> Trying ::1...
>>
>> telnet: connect to address ::1: Connection refused
>>
>> Trying 127.0.0.1...
>>
>> telnet: connect to address 127.0.0.1: Connection refused
>>
>> telnet: Unable to connect to remote host
>>
>>
>> On Fri, Feb 7, 2020 at 11:41 AM Ankur Goenka  wrote:
>>
>>> Seems that pipeline submission from sdk is not able to reach the job
>>> server which was started in docker.
>>>
>>> Can you try running "telnet localhost 8099" to make sure that pipeline
>>> submission can reach the job server.
>>>
>>> On Thu, Feb 6, 2020 at 8:16 PM Xander Song 
>>> wrote:
>>>
>>>> I am having difficulty following the Python guide for running Beam on
>>>> Flink <https://beam.apache.org/documentation/runners/flink/>. I
>>>> created a virtual environment with Apache Beam installed, then I started up
>>>> the JobService Docker container with
>>>>
>>>> docker run --net=host apachebeam/flink1.9_job_server:latest
>>>>
>>>>
>>>> I receive the following message confirming that the container is
>>>> running.
>>>>
>>>>
>>>> [main] INFO
>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>>>> ArtifactStagingService started on localhost:8098
>>>>
>>>> [main] INFO
>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
>>>> ExpansionService started on localhost:8097
>>>>
>>>> [main] INFO
>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>>>> JobService started on localhost:8099
>>>>
>>>>
>>>> In another terminal, I execute a Beam script called
>>>> test_beam_local_flink.py based on the example.
>>>>
>>>>
>>>> from __future__ import print_function
>>>> import apache_beamfrom apache_beam.options.pipeline_options import 
>>>> PipelineOptions
>>>>
>>>> data = [1,2,3]
>>>>
>>>> options = PipelineOptions([
>>>> "--runner=PortableRunner",
>>>> "--job_endpoint=localhost:8099",
>>>> "--environment_type=LOOPBACK"])
>>>> with apache_beam.Pipeline(options=options) as p:
>>>>   video_collection = (
>>>> p | apache_beam.Create(data)
>>>>   | apache_beam.Map(lambda x: x + 1)
>>>>   | apache_beam.Map(lambda x: print(x))
>>>>   )
>>>> print('Done')
>>>>
>>>> After a wait, I get the following traceback.
>>>>
>>>> /Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
>>>>  UserWarning: You are using Apache Beam with Python 2. New releases of 
>>>> Apache Beam will soon support Python 3 only.
>>>>
>>>>   'You are using Apache Beam with Python 2. '
>>>>
>>>> Traceback (most recent call last):
>>>>
>>>>   File "test_beam_local_flink.py", line 18, in 
>>>>
>>>> | apache_beam.Map(lambda x: print(x))
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>>>  line 481, in __exit__
>>>>
>>>> self.run().wait_until_finish()
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>>>  line 461, in run
>>>>
>>>> self._options).run(False)
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>>>  line 474, in run
>>>>
>>>> return self.runner.run_pipeline(self, self._options)
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>  line 220, in run_pipeline
>>>>
>>>> job_service = self.create_job_service(options)
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>  line 136, in create_job_service
>>>>
>>>> return server.start()
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
>>>>  line 59, in start
>>>>
>>>> grpc.channel_ready_future(channel).result(timeout=self._timeout)
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>>>  line 140, in result
>>>>
>>>> self._block(timeout)
>>>>
>>>>   File 
>>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>>>  line 86, in _block
>>>>
>>>> raise grpc.FutureTimeoutError()
>>>>
>>>> grpc.FutureTimeoutError
>>>>
>>>>
>>>>
>>>> Any help is greatly appreciated.
>>>>
>>>>


Re: Running Beam on Flink

2020-02-07 Thread Xander Song
Thanks for the response. After entering telnet localhost 8099, I receive

Trying ::1...

telnet: connect to address ::1: Connection refused

Trying 127.0.0.1...

telnet: connect to address 127.0.0.1: Connection refused

telnet: Unable to connect to remote host


On Fri, Feb 7, 2020 at 11:41 AM Ankur Goenka  wrote:

> Seems that pipeline submission from sdk is not able to reach the job
> server which was started in docker.
>
> Can you try running "telnet localhost 8099" to make sure that pipeline
> submission can reach the job server.
>
> On Thu, Feb 6, 2020 at 8:16 PM Xander Song  wrote:
>
>> I am having difficulty following the Python guide for running Beam on
>> Flink <https://beam.apache.org/documentation/runners/flink/>. I created
>> a virtual environment with Apache Beam installed, then I started up the
>> JobService Docker container with
>>
>> docker run --net=host apachebeam/flink1.9_job_server:latest
>>
>>
>> I receive the following message confirming that the container is running.
>>
>>
>> [main] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>> ArtifactStagingService started on localhost:8098
>>
>> [main] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
>> ExpansionService started on localhost:8097
>>
>> [main] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>> JobService started on localhost:8099
>>
>>
>> In another terminal, I execute a Beam script called
>> test_beam_local_flink.py based on the example.
>>
>>
>> from __future__ import print_function
>> import apache_beamfrom apache_beam.options.pipeline_options import 
>> PipelineOptions
>>
>> data = [1,2,3]
>>
>> options = PipelineOptions([
>> "--runner=PortableRunner",
>> "--job_endpoint=localhost:8099",
>> "--environment_type=LOOPBACK"])
>> with apache_beam.Pipeline(options=options) as p:
>>   video_collection = (
>> p | apache_beam.Create(data)
>>   | apache_beam.Map(lambda x: x + 1)
>>   | apache_beam.Map(lambda x: print(x))
>>   )
>> print('Done')
>>
>> After a wait, I get the following traceback.
>>
>> /Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
>>  UserWarning: You are using Apache Beam with Python 2. New releases of 
>> Apache Beam will soon support Python 3 only.
>>
>>   'You are using Apache Beam with Python 2. '
>>
>> Traceback (most recent call last):
>>
>>   File "test_beam_local_flink.py", line 18, in 
>>
>> | apache_beam.Map(lambda x: print(x))
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>  line 481, in __exit__
>>
>> self.run().wait_until_finish()
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>  line 461, in run
>>
>> self._options).run(False)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>  line 474, in run
>>
>> return self.runner.run_pipeline(self, self._options)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>  line 220, in run_pipeline
>>
>> job_service = self.create_job_service(options)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>  line 136, in create_job_service
>>
>> return server.start()
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
>>  line 59, in start
>>
>> grpc.channel_ready_future(channel).result(timeout=self._timeout)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>  line 140, in result
>>
>> self._block(timeout)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>  line 86, in _block
>>
>> raise grpc.FutureTimeoutError()
>>
>> grpc.FutureTimeoutError
>>
>>
>>
>> Any help is greatly appreciated.
>>
>>


Running Beam on Flink

2020-02-06 Thread Xander Song
I am having difficulty following the Python guide for running Beam on Flink
. I created a virtual
environment with Apache Beam installed, then I started up the JobService
Docker container with

docker run --net=host apachebeam/flink1.9_job_server:latest


I receive the following message confirming that the container is running.


[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
ArtifactStagingService started on localhost:8098

[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
ExpansionService started on localhost:8097

[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
JobService started on localhost:8099


In another terminal, I execute a Beam script called
test_beam_local_flink.py based on the example.


from __future__ import print_function
import apache_beamfrom apache_beam.options.pipeline_options import
PipelineOptions

data = [1,2,3]

options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"])
with apache_beam.Pipeline(options=options) as p:
  video_collection = (
p | apache_beam.Create(data)
  | apache_beam.Map(lambda x: x + 1)
  | apache_beam.Map(lambda x: print(x))
  )
print('Done')

After a wait, I get the following traceback.

/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
UserWarning: You are using Apache Beam with Python 2. New releases of
Apache Beam will soon support Python 3 only.

  'You are using Apache Beam with Python 2. '

Traceback (most recent call last):

  File "test_beam_local_flink.py", line 18, in 

| apache_beam.Map(lambda x: print(x))

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 481, in __exit__

self.run().wait_until_finish()

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 461, in run

self._options).run(False)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 474, in run

return self.runner.run_pipeline(self, self._options)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 220, in run_pipeline

job_service = self.create_job_service(options)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 136, in create_job_service

return server.start()

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
line 59, in start

grpc.channel_ready_future(channel).result(timeout=self._timeout)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
line 140, in result

self._block(timeout)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
line 86, in _block

raise grpc.FutureTimeoutError()

grpc.FutureTimeoutError



Any help is greatly appreciated.


Running Beam Pipelines with GPUs (and other questions)

2020-01-30 Thread Xander Song
Hello,

I am new to the Apache ecosystem and am attempting to use Beam to build a
horizontally scalable pipeline for feature extraction from video data. The
extraction process for certain features can be accelerated using GPUs,
while other features require only a CPU to compute. I have several
questions, listed in order of decreasing priority:

   1. Can I run a Beam pipeline with GPUs? (as far as I can tell, Google
   Cloud Dataflow does not currently support this option)
   2. Is it possible to achieve this functionality using Spark or Flink as
   a runner?
   3. Is it possible to mix hardware types in a Beam pipeline (e.g., to
   have certain features extracted by CPUs and others extracted by GPUs), or
   does this go against the Beam paradigm of abstracting away such details?
   4. Do the Spark and Flink runners have support for auto-scaling like
   Google Cloud Dataflow?
   5. What are relevant considerations when selecting between Spark vs.
   Flink as a runner?

Any guidance, resources, or tips are appreciated. Thank you in advance!
-Xander