How to debug dataflow locally

2019-09-12 Thread deepak kumar
Hi All,
I am trying to come up with a framework that would help debugging dataflow
job end to end locally.
Dataflow can work with 100s of source and sinks.
Is there a framework already to setup these sources and sinks locally e.g.
if my dataflow reads from BQ and inserts to Bigtable.
Please let me know if someone is already doing this.
If ues then how.

Thanks
Deepak


Re: How to buffer events using spark portable runner ?

2019-09-12 Thread Yu Watanabe
Lukasaz

Thank you for the reply.

I will try apache flink.

Thanks,
Yu

On Sun, Sep 8, 2019 at 11:59 PM Lukasz Cwik  wrote:

> Try using Apache Flink.
>
> On Sun, Sep 8, 2019 at 6:23 AM Yu Watanabe  wrote:
>
>> Hello .
>>
>> I would like to ask question related to timely processing as stated in
>> below page.
>>
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> Python version: 3.7.4
>> apache beam version: 2.15.0
>>
>> I currently use timely processing to first buffer events and send *bulk
>> requests *to elasticsearch. The source of data is bounded source and I
>> use DirectRunner for runner.
>>
>> To have more memory resource , I am considering to move to process the
>> pipeline on apache spark using portable runner. However, according to
>> compatibility matrix,
>> *Timers *is not supported on apache spark.
>>
>>
>> https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-when
>>
>> Is there anyway in portable runner that you can do similar processing as 
>> *timely
>> processing* ?
>> This is my first time using portable runner and I appreciate if I can get
>> help with this.
>>
>> Best Regards,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.ten...@gmail.com
>> [image: LinkedIn icon]   [image:
>> Twitter icon] 
>>
>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon]   [image:
Twitter icon] 


Re: How do you write portable runner pipeline on separate python code ?

2019-09-12 Thread Kyle Weaver
I prefer loopback because a) it writes output files to the local
filesystem, as the user expects, and b) you don't have to pull or build
docker images, or even have docker installed on your system -- which is one
less point of failure.

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise  wrote:

> This should become much better with 2.16 when we have the Docker images
> prebuilt.
>
> Docker is probably still the best option for Python on a JVM based runner
> in a local environment that does not have a development setup.
>
>
> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver  wrote:
>
>> +dev  I think we should probably point new users of
>> the portable Flink/Spark runners to use loopback or some other non-docker
>> environment, as Docker adds some operational complexity that isn't really
>> needed to run a word count example. For example, Yu's pipeline errored here
>> because the expected Docker container wasn't built before running.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
>> wrote:
>>
>>> On this note, making local files easy to read is something we'd
>>> definitely like to improve, as the current behavior is quite surprising.
>>> This could be useful not just for running with docker and the portable
>>> runner locally, but more generally when running on a distributed system
>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>>> could automatically stage local files to be read as artifacts that could be
>>> consumed by any worker (possibly via external directory mounting in the
>>> local docker case rather than an actual copy), and conversely copy small
>>> outputs back to the local machine (with the similar optimization for local
>>> docker).
>>>
>>> At the very least, however, obvious messaging when the local filesystem
>>> is used from within docker, which is often a (non-obvious and hard to
>>> debug) mistake should be added.
>>>
>>>
>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik  wrote:
>>>
 When you use a local filesystem path and a docker environment, "/tmp"
 is written inside the container. You can solve this issue by:
 * Using a "remote" filesystem such as HDFS/S3/GCS/...
 * Mounting an external directory into the container so that any "local"
 writes appear outside the container
 * Using a non-docker environment such as external or process.

 On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
 wrote:

> Hello.
>
> I would like to ask for help with my sample code using portable runner
> using apache flink.
> I was able to work out the wordcount.py using this page.
>
> https://beam.apache.org/roadmap/portability/
>
> I got below two files under /tmp.
>
> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
> py-wordcount-direct-1-of-2
> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
> py-wordcount-direct-0-of-2
>
> Then I wrote sample code with below steps.
>
> 1.Install apache_beam using pip3 separate from source code directory.
> 2. Wrote sample code as below and named it "test-protable-runner.py".
> Placed it separate directory from source code.
>
> ---
> (python) ywatanabe@debian-09-00:~$ ls -ltr
> total 16
> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
> code directory)
> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
> test-portable-runner.py
>
> ---
> 3. Executed the code with "python3 test-protable-ruuner.py"
>
>
> ==
> #!/usr/bin/env
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io import WriteToText
>
>
> def printMsg(line):
>
> print("OUTPUT: {0}".format(line))
>
> return line
>
> options = PipelineOptions(["--runner=PortableRunner",
> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>
> p = beam.Pipeline(options=options)
>
> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>  | beam.Map(printMsg)
>  )
>
> output | 'write' >> WriteToText('/tmp/sample.txt')
>
> ===
>
> Job seemed to went all the way to "FINISHED" state.
>
> ---
> [DataSource (Impulse) (1/1)] INFO

Re: How do you write portable runner pipeline on separate python code ?

2019-09-12 Thread Thomas Weise
This should become much better with 2.16 when we have the Docker images
prebuilt.

Docker is probably still the best option for Python on a JVM based runner
in a local environment that does not have a development setup.


On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver  wrote:

> +dev  I think we should probably point new users of
> the portable Flink/Spark runners to use loopback or some other non-docker
> environment, as Docker adds some operational complexity that isn't really
> needed to run a word count example. For example, Yu's pipeline errored here
> because the expected Docker container wasn't built before running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>> could automatically stage local files to be read as artifacts that could be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for local
>> docker).
>>
>> At the very least, however, obvious messaging when the local filesystem
>> is used from within docker, which is often a (non-obvious and hard to
>> debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik  wrote:
>>
>>> When you use a local filesystem path and a docker environment, "/tmp" is
>>> written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any "local"
>>> writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
>>> wrote:
>>>
 Hello.

 I would like to ask for help with my sample code using portable runner
 using apache flink.
 I was able to work out the wordcount.py using this page.

 https://beam.apache.org/roadmap/portability/

 I got below two files under /tmp.

 -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
 py-wordcount-direct-1-of-2
 -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
 py-wordcount-direct-0-of-2

 Then I wrote sample code with below steps.

 1.Install apache_beam using pip3 separate from source code directory.
 2. Wrote sample code as below and named it "test-protable-runner.py".
 Placed it separate directory from source code.

 ---
 (python) ywatanabe@debian-09-00:~$ ls -ltr
 total 16
 drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
 code directory)
 -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
 test-portable-runner.py

 ---
 3. Executed the code with "python3 test-protable-ruuner.py"


 ==
 #!/usr/bin/env

 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.io import WriteToText


 def printMsg(line):

 print("OUTPUT: {0}".format(line))

 return line

 options = PipelineOptions(["--runner=PortableRunner",
 "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])

 p = beam.Pipeline(options=options)

 output = ( p | 'create' >> beam.Create(["a", "b", "c"])
  | beam.Map(printMsg)
  )

 output | 'write' >> WriteToText('/tmp/sample.txt')

 ===

 Job seemed to went all the way to "FINISHED" state.

 ---
 [DataSource (Impulse) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task - Registering task at network:
 DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
 [DataSource (Impulse) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
 (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
 

Re: Python errors when using batch+windows+textio

2019-09-12 Thread Kyle Weaver
Hi Pawel, could you tell us which version of the Beam Python SDK you are
using?

For the record, this looks like a known issue:
https://issues.apache.org/jira/browse/BEAM-6860

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Wed, Sep 11, 2019 at 6:33 AM Paweł Kordek 
wrote:

> Hi
>
> I was developing a simple pipeline where I aggregate records by key and
> sum values for a predefined window. I was getting some errors, and after
> checking, I am getting exactly the same issues when running Wikipedia
> example from the Beam repo. The output is as follows:
> ---
> INFO:root:Missing pipeline option (runner). Executing pipeline using the
> default runner: DirectRunner.
> INFO:root:  at 0x7f333fc1fe60> 
> INFO:root:  0x7f333fc1ff80> 
> INFO:root: 
> 
> INFO:root: 
> 
> INFO:root: 
> 
> INFO:root: 
> 
> INFO:root: 
> 
> INFO:root:  0x7f333fc1d3b0> 
> INFO:root:  0x7f333fc1d440> 
> INFO:root:  0x7f333fc1d5f0> 
> INFO:root: 
> 
> INFO:root:  0x7f333fc1d710> 
> INFO:root:Running
> ((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter( at
> top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
> INFO:root:Running
> (((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
> INFO:root:Running
> (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
> INFO:root:Running
> ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 829, in
> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>   File "apache_beam/runners/common.py", line 403, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 406, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 982, in
> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>   File "apache_beam/runners/worker/operations.py", line 142, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 122, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 196, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 214, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1014, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1030, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 

Re: Beam/flink/kubernetes/minikube/wordcount example

2019-09-12 Thread Austin Bennett
I got hung up on that issue earlier this week.  Was using Flink 1.7.  V2.15
of Beam.  Wasn't using Kubernetes.

Then gave up, so don't have a solution :-/

I don't understand the job server enough, but think I was getting error
when I did not have it running

(I still don't understand portability enough, so might not be using this
terminology correctly).


On Wed, Sep 11, 2019 at 1:26 PM Matthew Patterson 
wrote:

> Nope: dang, thanks.
>
> On 9/11/19, 3:49 PM, "Robert Bradshaw"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you recognize the sender and
> know the content is safe.
>
>
> Is your input on a file system accessible to the workers? (Including,
> from within Docker, if the workers are running in docker.)
>
> On Wed, Sep 11, 2019 at 12:03 PM Matthew Patterson
>  wrote:
> >
> > Hi Beamers,
> >
> >
> >
> > I am running the `wordcount` example, but following example from
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Frunners%2Fflink%2Fdata=02%7C01%7Cmpatterson%40aligntech.com%7Cda40636a76c64746508108d736f12833%7C9ac44c96980a481bae23d8f56b82c605%7C0%7C1%7C637038281702213802sdata=mYqB38n4myNxK9Krvh8opjxK%2BK4kHbwfBLZcgmt3OF8%3Dreserved=0,
> that is, I change the pipeline initialization as follows.
> >
> >
> >
> > ```
> >
> > import apache_beam as beam
> >
> > from apache_beam.options.pipeline_options import PipelineOptions
> >
> >
> >
> > options = PipelineOptions(["--runner=FlinkRunner",
> "--flink_version=1.8", "--flink_master_url=localhost:8081"])
> >
> > with beam.Pipeline(options) as p:
> >
> > …
> >
> > ```
> >
> >
> >
> > Running against my minikube cluster (), I get:
> >
> >
> >
> > “RuntimeError: Pipeline
> BeamApp-mpatterson-0911164258-7ef8768c_71984a02-5036-421e-9754-b57dbc628d3f
> failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> >
> > ”
> >
> >
> >
> > Any ideas?
> >
> >
> >
> > Thanks,
> >
> > Matt
> >
> >
> >
> > (minikube version: v1.3.1
> >
> > commit: ca60a424ce69a4d79f502650199ca2b52f29e631
> >
> >
> >
> > bash-3.2$ kubectl version
> >
> > Client Version: version.Info{Major:"1", Minor:"14",
> GitVersion:"v1.14.6", GitCommit:"96fac5cd13a5dc064f7d9f4f23030a6aeface6cc",
> GitTreeState:"clean", BuildDate:"2019-08-19T11:13:49Z",
> GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
> >
> > Server Version: version.Info{Major:"1", Minor:"15",
> GitVersion:"v1.15.2", GitCommit:"f6278300bebbb750328ac16ee6dd3aa7d3549568",
> GitTreeState:"clean", BuildDate:"2019-08-05T09:15:22Z",
> GoVersion:"go1.12.5", Compiler:"gc", Platform:"linux/amd64"}
> >
> >
> >
> > Flink job- and task-manager containers both built from flink:1.8
> >
> > )
> >
> >
> >
> > Full output
> >
> > >>>
> >
> >
> >
> > /anaconda3/envs/aws/bin/python
> /Users/mpatterson/Library/Preferences/PyCharmCE2019.2/scratches/beam_me_up.py
> >
> > /Users/mpatterson/dev/beam/sdks/python/apache_beam/__init__.py:84:
> UserWarning: Some syntactic constructs of Python 3 are not yet fully
> supported by Apache Beam.
> >
> >   'Some syntactic constructs of Python 3 are not yet fully supported
> by '
> >
> > executable: /anaconda3/envs/aws/bin/python
> >
> > beam.__version__: 2.15.0
> >
> > WARNING:root:Make sure that locally built Python SDK docker image
> has Python 3.7 interpreter. See also: BEAM-7474.
> >
> > INFO:root:Using latest locally built Python SDK docker image:
> mpatterson-docker-apache.bintray.io/beam/python3:latest.
> >
> > INFO:root:  0x11850b200> 
> >
> > INFO:root: 
> 
> >
> > WARNING:root:Downloading job server jar from
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Frepo.maven.apache.org%2Fmaven2%2Forg%2Fapache%2Fbeam%2Fbeam-runners-flink-1.8-job-server%2F2.15.0%2Fbeam-runners-flink-1.8-job-server-2.15.0.jardata=02%7C01%7Cmpatterson%40aligntech.com%7Cda40636a76c64746508108d736f12833%7C9ac44c96980a481bae23d8f56b82c605%7C0%7C1%7C637038281702213802sdata=Vj3F0j39Ped9UBS0196wZA1CN9UT0ckdXfLolNbM94E%3Dreserved=0
> >
> > [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
> ArtifactStagingService started on localhost:57443
> >
> > [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
> ExpansionService started on localhost:57444
> >
> > [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
> JobService started on localhost:57439
> >
> > [grpc-default-executor-0] ERROR
> 

Re: AvroIO Windowed Writes - Number of files to specify

2019-09-12 Thread Chamikara Jayalath
I'm bit confused since we mention
https://issues.apache.org/jira/browse/BEAM-1438 before that error but that
JIRA has been fixed a few years ago.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L312

+Reuven Lax  can you comment on whether we can remove
this restriction now that the JIRA has been fixed ?

Thanks,
Cham

On Thu, Sep 12, 2019 at 5:34 AM Ziyad Muhammed  wrote:

> Hi Cham
>
> Any update on this?
>
> Best
> Ziyad
>
>
> On Thu, Sep 5, 2019 at 5:43 PM Ziyad Muhammed  wrote:
>
>> Hi Cham
>>
>> I tried that before. Apparently it's not accepted by either direct runner
>> or dataflow runner. I get the below error:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: When
>>> applying WriteFiles to an unbounded PCollection, must specify number of
>>> output shards explicitly
>>> at
>>> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:299)
>>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:109)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1519)
>>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1155)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
>>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1659)
>>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1541)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>>>
>>
>>
>>
>> Best
>> Ziyad
>>
>>
>> On Wed, Sep 4, 2019 at 6:45 PM Chamikara Jayalath 
>> wrote:
>>
>>> Do you mean the value to specify for number of shards to write [1] ?
>>>
>>> For this I think it's better to not specify any value which will give
>>> the runner the most flexibility.
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455
>>>
>>> On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed  wrote:
>>>
 Hi all

 I have a beam pipeline running with cloud dataflow that produces avro
 files on GCS. Window duration is 1 minute and currently the job is running
 with 64 cores (16 * n1-standard-4). Per minute the data produced is around
 2GB.

 Is there any recommendation on the number of avro files to specify?
 Currently I'm using 64 (to match with the number of cores). Will a very
 high number help in increasing the write throughput?
 I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
 files.

 I tried some random values, but couldn't infer a pattern when is it
 more performant.

 Any suggestion is hugely appreciated.

 Best
 Ziyad

>>>


Re: How can I work with multiple pcollections?

2019-09-12 Thread Lukasz Cwik
Yes you can create multiple output PCollections using a ParDo with multiple
outputs instead of inserting them into Mongo.

It could be useful to read through the programming guide related to
PCollections[1] and PTransforms with multiple outputs[2] and feel free to
return with more questions.

1: https://beam.apache.org/documentation/programming-guide/#pcollections
2:
https://beam.apache.org/documentation/programming-guide/#additional-outputs

On Thu, Sep 12, 2019 at 2:24 PM Steve973  wrote:

> I am new to Beam, and I am pretty excited to get started.  I have been
> doing quite a bit of research and playing around with the API.  But for my
> use case, unless I am not approaching it correctly, suggests that I will
> need to process multiple PCollections in some parts of my pipeline.
>
> I am working out some of my business logic without a parallelization
> framework to get the solution working.  Then I will convert the workflow to
> Beam.  What I am doing is reading millions of files from the file system,
> and I am processing parts of the file into three different output types,
> and storing them in MongoDB in three collections.  After this initial
> extraction (mapping), I modify some of the data which will result in
> duplicates.  So the next step is a reduction step to eliminate the
> duplicates (based on a number of fields) and aggregate the references to
> the other 2 data types, so the reduced object contains the dedupe fields,
> and a list of references to documents in the other 2 collections.  I'm not
> touching either of these two collections at this time, but this is where my
> question comes in.  If I map this data, can I create three separate
> PCollections instead of inserting them into Mongo?  After the
> deduplication, I will need to combine data in two of the streams, and I
> need to store the results of that combination into mongo.  Then I need to
> process the third collection, which will go into its own mongo collection.
>
> I hope my description was at least enough to get the conversation
> started.  Is my approach reasonable, and can I create multiple PCollections
> and use them at different phases of my pipeline?  Or is there another way
> that I should be looking at this?
>
> Thanks in advance!
> Steve
>


How can I work with multiple pcollections?

2019-09-12 Thread Steve973
I am new to Beam, and I am pretty excited to get started.  I have been
doing quite a bit of research and playing around with the API.  But for my
use case, unless I am not approaching it correctly, suggests that I will
need to process multiple PCollections in some parts of my pipeline.

I am working out some of my business logic without a parallelization
framework to get the solution working.  Then I will convert the workflow to
Beam.  What I am doing is reading millions of files from the file system,
and I am processing parts of the file into three different output types,
and storing them in MongoDB in three collections.  After this initial
extraction (mapping), I modify some of the data which will result in
duplicates.  So the next step is a reduction step to eliminate the
duplicates (based on a number of fields) and aggregate the references to
the other 2 data types, so the reduced object contains the dedupe fields,
and a list of references to documents in the other 2 collections.  I'm not
touching either of these two collections at this time, but this is where my
question comes in.  If I map this data, can I create three separate
PCollections instead of inserting them into Mongo?  After the
deduplication, I will need to combine data in two of the streams, and I
need to store the results of that combination into mongo.  Then I need to
process the third collection, which will go into its own mongo collection.

I hope my description was at least enough to get the conversation started.
Is my approach reasonable, and can I create multiple PCollections and use
them at different phases of my pipeline?  Or is there another way that I
should be looking at this?

Thanks in advance!
Steve


Re: How do you write portable runner pipeline on separate python code ?

2019-09-12 Thread Kyle Weaver
+dev  I think we should probably point new users of
the portable Flink/Spark runners to use loopback or some other non-docker
environment, as Docker adds some operational complexity that isn't really
needed to run a word count example. For example, Yu's pipeline errored here
because the expected Docker container wasn't built before running.

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
wrote:

> On this note, making local files easy to read is something we'd definitely
> like to improve, as the current behavior is quite surprising. This could be
> useful not just for running with docker and the portable runner locally,
> but more generally when running on a distributed system (e.g. a Flink/Spark
> cluster or Dataflow). It would be very convenient if we could automatically
> stage local files to be read as artifacts that could be consumed by any
> worker (possibly via external directory mounting in the local docker case
> rather than an actual copy), and conversely copy small outputs back to the
> local machine (with the similar optimization for local docker).
>
> At the very least, however, obvious messaging when the local filesystem is
> used from within docker, which is often a (non-obvious and hard to debug)
> mistake should be added.
>
>
> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik  wrote:
>
>> When you use a local filesystem path and a docker environment, "/tmp" is
>> written inside the container. You can solve this issue by:
>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>> * Mounting an external directory into the container so that any "local"
>> writes appear outside the container
>> * Using a non-docker environment such as external or process.
>>
>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
>> wrote:
>>
>>> Hello.
>>>
>>> I would like to ask for help with my sample code using portable runner
>>> using apache flink.
>>> I was able to work out the wordcount.py using this page.
>>>
>>> https://beam.apache.org/roadmap/portability/
>>>
>>> I got below two files under /tmp.
>>>
>>> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
>>> py-wordcount-direct-1-of-2
>>> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
>>> py-wordcount-direct-0-of-2
>>>
>>> Then I wrote sample code with below steps.
>>>
>>> 1.Install apache_beam using pip3 separate from source code directory.
>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>> Placed it separate directory from source code.
>>>
>>> ---
>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>> total 16
>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
>>> directory)
>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>> test-portable-runner.py
>>>
>>> ---
>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>
>>>
>>> ==
>>> #!/usr/bin/env
>>>
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.io import WriteToText
>>>
>>>
>>> def printMsg(line):
>>>
>>> print("OUTPUT: {0}".format(line))
>>>
>>> return line
>>>
>>> options = PipelineOptions(["--runner=PortableRunner",
>>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>>
>>> p = beam.Pipeline(options=options)
>>>
>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>>  | beam.Map(printMsg)
>>>  )
>>>
>>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>>
>>> ===
>>>
>>> Job seemed to went all the way to "FINISHED" state.
>>>
>>> ---
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>>> [DataSource (Impulse) (1/1)] INFO
>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>>> to RUNNING.
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) 

Re: How do you write portable runner pipeline on separate python code ?

2019-09-12 Thread Lukasz Cwik
When you use a local filesystem path and a docker environment, "/tmp" is
written inside the container. You can solve this issue by:
* Using a "remote" filesystem such as HDFS/S3/GCS/...
* Mounting an external directory into the container so that any "local"
writes appear outside the container
* Using a non-docker environment such as external or process.

On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe  wrote:

> Hello.
>
> I would like to ask for help with my sample code using portable runner
> using apache flink.
> I was able to work out the wordcount.py using this page.
>
> https://beam.apache.org/roadmap/portability/
>
> I got below two files under /tmp.
>
> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
> py-wordcount-direct-1-of-2
> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
> py-wordcount-direct-0-of-2
>
> Then I wrote sample code with below steps.
>
> 1.Install apache_beam using pip3 separate from source code directory.
> 2. Wrote sample code as below and named it "test-protable-runner.py".
> Placed it separate directory from source code.
>
> ---
> (python) ywatanabe@debian-09-00:~$ ls -ltr
> total 16
> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
> directory)
> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25 test-portable-runner.py
>
> ---
> 3. Executed the code with "python3 test-protable-ruuner.py"
>
>
> ==
> #!/usr/bin/env
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io import WriteToText
>
>
> def printMsg(line):
>
> print("OUTPUT: {0}".format(line))
>
> return line
>
> options = PipelineOptions(["--runner=PortableRunner",
> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>
> p = beam.Pipeline(options=options)
>
> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>  | beam.Map(printMsg)
>  )
>
> output | 'write' >> WriteToText('/tmp/sample.txt')
>
> ===
>
> Job seemed to went all the way to "FINISHED" state.
>
> ---
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
> to RUNNING.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap( core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap( core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
> DEPLOYING.
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
> MapPartition (MapPartition at
> [2]write/Write/WriteImpl/DoOnce/{FlatMap(),
> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
> [flink-akka.actor.default-dispatcher-2] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
> MapPartition (MapPartition at
> [2]write/Write/WriteImpl/DoOnce/{FlatMap(),
> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
> [DataSource (Impulse) (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED.
>
> 

Re: AvroIO Windowed Writes - Number of files to specify

2019-09-12 Thread Ziyad Muhammed
Hi Cham

Any update on this?

Best
Ziyad


On Thu, Sep 5, 2019 at 5:43 PM Ziyad Muhammed  wrote:

> Hi Cham
>
> I tried that before. Apparently it's not accepted by either direct runner
> or dataflow runner. I get the below error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: When
>> applying WriteFiles to an unbounded PCollection, must specify number of
>> output shards explicitly
>> at
>> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:299)
>> at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:109)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1519)
>> at org.apache.beam.sdk.io.AvroIO$TypedWrite.expand(AvroIO.java:1155)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1659)
>> at org.apache.beam.sdk.io.AvroIO$Write.expand(AvroIO.java:1541)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
>>
>
>
>
> Best
> Ziyad
>
>
> On Wed, Sep 4, 2019 at 6:45 PM Chamikara Jayalath 
> wrote:
>
>> Do you mean the value to specify for number of shards to write [1] ?
>>
>> For this I think it's better to not specify any value which will give the
>> runner the most flexibility.
>>
>> Thanks,
>> Cham
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1455
>>
>> On Wed, Sep 4, 2019 at 2:42 AM Ziyad Muhammed  wrote:
>>
>>> Hi all
>>>
>>> I have a beam pipeline running with cloud dataflow that produces avro
>>> files on GCS. Window duration is 1 minute and currently the job is running
>>> with 64 cores (16 * n1-standard-4). Per minute the data produced is around
>>> 2GB.
>>>
>>> Is there any recommendation on the number of avro files to specify?
>>> Currently I'm using 64 (to match with the number of cores). Will a very
>>> high number help in increasing the write throughput?
>>> I saw that BigqueryIO with FILE_LOADS is using a default value of 1000
>>> files.
>>>
>>> I tried some random values, but couldn't infer a pattern when is it more
>>> performant.
>>>
>>> Any suggestion is hugely appreciated.
>>>
>>> Best
>>> Ziyad
>>>
>>


How do you write portable runner pipeline on separate python code ?

2019-09-12 Thread Yu Watanabe
Hello.

I would like to ask for help with my sample code using portable runner
using apache flink.
I was able to work out the wordcount.py using this page.

https://beam.apache.org/roadmap/portability/

I got below two files under /tmp.

-rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
py-wordcount-direct-1-of-2
-rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
py-wordcount-direct-0-of-2

Then I wrote sample code with below steps.

1.Install apache_beam using pip3 separate from source code directory.
2. Wrote sample code as below and named it "test-protable-runner.py".
Placed it separate directory from source code.
---
(python) ywatanabe@debian-09-00:~$ ls -ltr
total 16
drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
directory)
-rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25 test-portable-runner.py

---
3. Executed the code with "python3 test-protable-ruuner.py"

==
#!/usr/bin/env

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText


def printMsg(line):

print("OUTPUT: {0}".format(line))

return line

options = PipelineOptions(["--runner=PortableRunner",
"--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])

p = beam.Pipeline(options=options)

output = ( p | 'create' >> beam.Create(["a", "b", "c"])
 | beam.Map(printMsg)
 )

output | 'write' >> WriteToText('/tmp/sample.txt')
===

Job seemed to went all the way to "FINISHED" state.
---
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: DataSource (Impulse) (1/1)
(9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched
from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
(Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
to RUNNING.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
(MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
(1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
(MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
(1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
DEPLOYING.
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN
MapPartition (MapPartition at
[2]write/Write/WriteImpl/DoOnce/{FlatMap(),
Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt #0)
to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task CHAIN
MapPartition (MapPartition at
[2]write/Write/WriteImpl/DoOnce/{FlatMap(),
Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1).
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- Loading JAR files for task DataSource (Impulse) (1/1)
(d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: DataSource (Impulse) (1/1)
(d51e4171c0af881342eaa8a7ab06def8) [DEPLOYING].
[DataSource (Impulse) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task
- DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched
from RUNNING to FINISHED.
---

But I ended up with docker error on client side.

---
(python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py
/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84:
UserWarning: Some syntactic constructs of Python 3 are not yet fully
supported by Apache Beam.
  'Some syntactic constructs of Python 3 are not yet fully supported by '