Re: How to optimize the performance of Beam on Spark(Internet mail)

2018-09-18 Thread Jean-Baptiste Onofré
Hi,

did you compare the stages in the Spark UI in order to identify which
stage is taking time ?

You use spark-submit in both cases for the bootstrapping ?

I will do a test here as well.

Regards
JB

On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> Hi,
>     Thanks for you reply.
>     Our team plan to use Beam instead of Spark, So I'm testing the
> performance of Beam API.
>     I'm coding some example through Spark API and Beam API , like
> "WordCount" , "Join",  "OrderBy",  "Union" ...
>     I use the same Resources and configuration to run these Job.   
>    Tim said I should remove "withNumShards(1)" and
> set spark.default.parallelism=32. I did it and tried again, but Beam job
> still running very slowly.
>     Here is My Beam code and Spark code:
>    Beam "WordCount":
>     
>    Spark "WordCount":
> 
>    I will try the other example later.
>     
> Regards
> devin
> 
>  
> *From:* Jean-Baptiste Onofré 
> *Date:* 2018-09-18 22:43
> *To:* dev@beam.apache.org 
> *Subject:* Re: How to optimize the performance of Beam on
> Spark(Internet mail)
> 
> Hi,
> 
> The first huge difference is the fact that the spark runner still uses
> RDD whereas directly using spark, you are using dataset. A bunch of
> optimization in spark are related to dataset.
> 
> I started a large refactoring of the spark runner to leverage Spark 2.x
> (and dataset).
> It's not yet ready as it includes other improvements (the portability
> layer with Job API, a first check of state API, ...).
> 
> Anyway, by Spark wordcount, you mean the one included in the spark
> distribution ?
> 
> Regards
> JB
> 
> On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> > Hi,
> >     I'm testing Beam on Spark. 
> >     I use spark example code WordCount processing 1G data file, cost 1
> > minutes.
> >     However, I use Beam example code WordCount processing the same
> file,
> > cost 30minutes.
> >     My Spark parameter is :  --deploy-mode client
>  --executor-memory 1g
> > --num-executors 1 --driver-memory 1g
> >     My Spark version is 2.3.1,  Beam version is 2.5
> >     Is there any optimization method?
> > Thank you.
> >
> >    
> 
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: BiqQueryIO.write and Wait.on

2018-09-18 Thread Reuven Lax
I've been looking at this a bit, and I think it will be tricky to figure
out how to get this working with BigQueryIO. The base problem is that Wait
tries to sequence things by window, as it's hard to come up with a
different definition of sequencing with unbounded inputs. However
BigQueryIO rewindows all input data into the GlobalWindow first thing. This
is usually the right thing for a sink to do as each element should be sent
to external sink as quickly as possible without waiting for windows to
close. However this doesn't play nice with the Wait transform, as the
original windows are lost inside BigQueryIO.

The best solution I can think of is to allow Wait on BigQueryIO _only_ if
the original window is the global window (which is common for batch jobs),
and to disallow it otherwise. This is unfortunate because it means (among
other things) that this won't work at all in streaming. If anyone has a
better idea, I'd love to hear it.

Reuven

On Wed, Jul 25, 2018 at 9:57 AM Carlos Alonso  wrote:

> Just opened this PR: https://github.com/apache/beam/pull/6055 to get
> feedback ASAP. Basically what it does is return the job status in a
> PCollection of BigQueryWriteResult objects
>
> On Fri, Jul 20, 2018 at 11:57 PM Reuven Lax  wrote:
>
>> There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.
>>
>> On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov 
>> wrote:
>>
>>> Hmm, I think this approach has some complications:
>>> - Using JobStatus makes it tied to using BigQuery batch load jobs, but
>>> the return type ought to be the same regardless of which method of writing
>>> is used (including potential future BigQuery APIs - they are evolving), or
>>> how many BigQuery load jobs are involved in writing a given window (it can
>>> be multiple).
>>> - Returning a success/failure indicator makes it prone to users ignoring
>>> the failure: the default behavior should be that, if the pipeline succeeds,
>>> that means all data was successfully written - if users want different
>>> error handling, e.g. a deadletter queue, they should have to specify it
>>> explicitly.
>>>
>>> I would recommend to return a PCollection of a type that's invariant to
>>> which load method is used (streaming writes, load jobs, multiple load jobs
>>> etc.). If it's unclear what type that should be, you could introduce an
>>> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
>>> signaling success, and later add something to it.
>>>
>>> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso 
>>> wrote:
>>>
 All good so far. I've been a bit side tracked but more or less I have
 the idea of using the JobStatus as part of the collection so that not only
 the completion is signaled, but also the result (success/failure) can be
 accessed, how does it sound?

 Regards

 On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov 
 wrote:

> Hi Carlos,
>
> Any updates / roadblocks you hit?
>
>
> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov 
> wrote:
>
>> Awesome!! Thanks for the heads up, very exciting, this is going to
>> make a lot of people happy :)
>>
>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso 
>> wrote:
>>
>>> + dev@beam.apache.org
>>>
>>> Just a quick email to let you know that I'm starting developing this.
>>>
>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>> kirpic...@google.com> wrote:
>>>
 Hi Carlos,

 Thank you for expressing interest in taking this on! Let me give
 you a few pointers to start, and I'll be happy to help everywhere 
 along the
 way.

 Basically we want BigQueryIO.write() to return something (e.g. a
 PCollection) that can be used as input to Wait.on().
 Currently it returns a WriteResult, which only contains a
 PCollection of failed inserts - that one can not be used
 directly, instead we should add another component to WriteResult that
 represents the result of successfully writing some data.

 Given that BQIO supports dynamic destination writes, I think it
 makes sense for that to be a PCollection> so 
 that in
 theory we could sequence different destinations independently 
 (currently
 Wait.on() does not provide such a feature, but it could); and it will
 require changing WriteResult to be WriteResult. As for 
 what
 the "???" might be - it is something that represents the result of
 successfully writing a window of data. I think it can even be Void, or 
 "?"
 (wildcard type) for now, until we figure out something better.

 Implementing this would require roughly the following work:
 - Add this PCollection> to WriteResult
 - Modify the BatchLoads transform to provide it on both codepaths:
 expandTriggered() and expandUntrig

Re: Beam python sdk with gevent

2018-09-18 Thread Ahmet Altay
I am also not familiar with gevent. Could you explain what are you trying
to do and how do you plan to use gevent?

On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik  wrote:

> I don't think anyone has tried what your doing. The code that your working
> with is very new.
>
> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde  wrote:
>
>> Hi all,
>>
>> We're using the Python SDK with the portable Flink runner and running
>> into some problems integrating gevent. We're patching the gRPC runtime for
>> gevent as described in [0] which allows pipelines to start and partially
>> run. However the tasks produce a stream of gevent exceptions:
>>
>> Exception greenlet.error: error('cannot switch to a different thread',)
>> in 'grpc._cython.cygrpc.run_loop' ignored
>> Traceback (most recent call last):
>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>   File "src/gevent/event.py", line 140, in gevent._event._
>> AbstractLinkable._wait
>>   File "src/gevent/event.py", line 117, in gevent._event._
>> AbstractLinkable._wait_core
>>   File "src/gevent/event.py", line 119, in gevent._event._
>> AbstractLinkable._wait_core
>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>> gevent.__greenlet_primitives._greenlet_switch
>> greenlet.error: cannot switch to a different thread
>>
>> and do not make any progress.
>>
>> Has anybody else successfully used the portable python sdk with gevent?
>> Or is there a recommended alternative for doing async IO in python
>> pipelines?
>>
>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>
>> Micah
>>
>


Tracking and resolving release blocking bugs

2018-09-18 Thread Connell O'Callaghan
Hi All

In order to allow successful and smooth deployment of the latest BEAM
releases, are the community OK that we track bugs blocking releases, with a
goal to resolve such bugs within a week? If there is general agreement (or
no major objections) on this we will edit the contributor page using
similar language to the "Stale pull requests" section -early next week.

Thank you all,
- Connell


Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-18 Thread flyisland
Hi Gurus,

I'm trying to create an IO connector to fetch data from a socket server
from Beam, I'm new to Beam, but according to this blog <
https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
that SDF is the recommended way to implement an IO connector now.

This in-house built socket server could accept multiple clients, but only
send messages to the first-connected client, and will send messages to the
second client if the first one disconnected.

To understand the lifecycle of a DoFn, I've just created a very simple DoFn
subclass, call log.debug() in every method, and according to the JavaDoc of
DoFn.Setup(), "This is a good place to initialize transient in-memory
resources, such as network connections. The resources can then be disposed
in DoFn.Teardown." I guess I should create the connection to the socket
server in the setup() method.

But based on the log messages below, even the input PCollection has only
one element, Beam will still create more multiple DemoIO instances and
invoked a different DemoIO instance after every checkpoint.

I'm wondering:
1. How could I let Beam create only one DemoIO instance, or at least use
the same instance constantly?
2. Or should I use the Source API for such purpose?

Thanks in advance.

Logs:
07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@60a58077->setup() is called!
07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
First->getInitialRestriction() is called!
07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@417eede1->setup() is called!
07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
9223372036854775807)->newTracker() is called!
07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
called!
07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:56.285Z -> 0 -> First
07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:56.786Z -> 1 -> First
07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
9223372036854775807)->newTracker() is called!
07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
called!
07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:57.354Z -> 2 -> First
07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->setup() is called!
07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:57.856Z -> 3 -> First
07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:58.358Z -> 4 -> First
07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
9223372036854775807)->newTracker() is called!
07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
called!
07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!

WindowedWordCountSDF.java

Pipeline pipeline = Pipeline.create(options);
List LINES = Arrays.asList("First");
PCollection input =
pipeline
.apply(Create.of(LINES))
.apply(ParDo.of(new DemoIO()));
...


DemoIO.java

public class DemoIO extends DoFn {
private sta

Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-18 Thread Lukasz Cwik
Romain hinted that this was a dependency issue but when comparing the two
dependency trees I don't get much of a difference:

lcwik@lcwik0: ~$ diff /tmp/260 /tmp/270
< [INFO] +- org.apache.beam:beam-runners-spark:jar:2.6.0:compile
< [INFO] |  +- org.apache.beam:beam-model-pipeline:jar:2.6.0:compile
---
> [INFO] +- org.apache.beam:beam-runners-spark:jar:2.7.0:compile
> [INFO] |  +- org.apache.beam:beam-model-pipeline:jar:2.7.0:compile
5c6
< [INFO] |  +- org.apache.beam:beam-sdks-java-core:jar:2.6.0:compile
---
> [INFO] |  +- org.apache.beam:beam-sdks-java-core:jar:2.7.0:compile
14,18c15,19
< [INFO] |  |  \- org.tukaani:xz:jar:1.5:compile
< [INFO] |  +-
org.apache.beam:beam-runners-core-construction-java:jar:2.6.0:compile
< [INFO] |  |  \-
org.apache.beam:beam-model-job-management:jar:2.6.0:compile
< [INFO] |  +- org.apache.beam:beam-runners-core-java:jar:2.6.0:compile
< [INFO] |  |  \- org.apache.beam:beam-model-fn-execution:jar:2.6.0:compile
---
> [INFO] |  |  \- org.tukaani:xz:jar:1.8:compile
> [INFO] |  +-
org.apache.beam:beam-runners-core-construction-java:jar:2.7.0:compile
> [INFO] |  |  \-
org.apache.beam:beam-model-job-management:jar:2.7.0:compile
> [INFO] |  +- org.apache.beam:beam-runners-core-java:jar:2.7.0:compile
> [INFO] |  |  \- org.apache.beam:beam-model-fn-execution:jar:2.7.0:compile

Other then Beam package changes, the only other change is xz which I don't
believe could be causing the issue.

On Tue, Sep 18, 2018 at 8:38 AM Jean-Baptiste Onofré 
wrote:

> Thanks, let me take a look.
>
> Regards
> JB
>
> On 18/09/2018 17:36, Romain Manni-Bucau wrote:
> >
> >
> >
> > Le mar. 18 sept. 2018 à 16:44, Jean-Baptiste Onofré  > > a écrit :
> >
> > Hi,
> >
> > I don't have the issue ;)
> >
> > As said in my vote, I tested 2.7.0 RC1 on beam-samples with Spark
> > without problem.
> >
> > I don't reproduce Romain issue as well.
> >
> > @Romain can you provide some details to reproduce the issue ?
> >
> >
> > Sure, you can use this
> > reproducer: https://github.com/rmannibucau/beam-2.7.0-fails
> > It shows that it suceeds on 2.6 and fails on 2.7.
> >
> >
> >
> > Regards
> > JB
> >
> > On 17/09/2018 19:17, Charles Chen wrote:
> > > Luke, Maximillian, Raghu, can you please propose cherry-pick PRs
> > to the
> > > release-2.7.0 for your issues and add me as a reviewer
> > (@charlesccychen)?
> > >
> > > Romain, JB: is there any way I can help with debugging the issue
> > you're
> > > facing so we can unblock the release?
> > >
> > > On Fri, Sep 14, 2018 at 1:49 PM Raghu Angadi  > 
> > > >> wrote:
> > >
> > > I would like propose one more cherrypick for RC2
> > > : https://github.com/apache/beam/pull/6391
> > > This is a KafkaIO bug fix. Once a user hits this bug, there is
> no
> > > easy work around for them, especially on Dataflow. Only work
> > around
> > > in Dataflow is to restart or reload the job.
> > >
> > > The fix itself fairly safe and is tested.
> > > Raghu.
> > >
> > > On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko
> > > mailto:aromanenko@gmail.com>
> > >>
> > wrote:
> > >
> > > Perhaps it could help, but I run simple WordCount (built
> with
> > > Beam 2.7) on YARN/Spark (HDP Sandbox) cluster and it
> > worked fine
> > > for me.
> > >
> > >> On 14 Sep 2018, at 06:56, Romain Manni-Bucau
> > >> mailto:rmannibu...@gmail.com>
> > >>
> wrote:
> > >>
> > >> Hi Charles,
> > >>
> > >> I didn't get enough time to check deeply but it is
> clearly a
> > >> dependency issue and it is not in beam spark runner
> > itself but
> > >> in another transitive module of beam. It does not happen
> in
> > >> existing spark test cause none of them are in a cluster
> (even
> > >> just with 1 worker) but this seems to be a regression
> since
> > >> 2.6 works OOTB.
> > >>
> > >> Romain Manni-Bucau
> > >> @rmannibucau  |  Blog
> > >>  | Old Blog
> > >>  | Github
> > >>  | LinkedIn
> > >>  | Book
> > >>
> >  <
> https://www.packtpub.com/application-development/java-ee-8-high-performance
> >
> > >>
> > >>
> > >> Le jeu. 13 sept. 2018 à 22:15, Charles Chen
> > mailto:c...@google.com>
> > >> >> a
> écrit :

Re: Migrating Beam SQL to Calcite's code generation

2018-09-18 Thread Ankur Goenka
Thats Awesome!
Thanks Team!

On Tue, Sep 18, 2018 at 10:58 AM Jean-Baptiste Onofré 
wrote:

> Awesome,
>
> thanks Andrew !!
>
> Regards
> JB
>
> On 17/09/2018 23:27, Andrew Pilloud wrote:
> > I've adapted Calcite's EnumerableCalc code generation to generate the
> > BeamCalc DoFn. The primary purpose behind this change is so we can take
> > advantage of Calcite's extensive SQL operator implementation. This
> > deletes ~11000 lines of code from Beam (with ~350 added), significantly
> > increases the set of supported SQL operators, and improves performance
> > and correctness of currently supported operators. Here is my work in
> > progress: https://github.com/apache/beam/pull/6417
> >
> > There are a few bugs in Calcite that this has exposed:
> >
> > Fixed in Calcite master:
> >
> >   * CALCITE-2321 
> > - The type of a union of CHAR columns of different lengths should be
> > VARCHAR
> >   * CALCITE-2447  -
> > Some POWER, ATAN2 functions fail with NoSuchMethodException
> >
> > Pending PRs:
> >
> >   * CALCITE-2529 
> > - linq4j should promote integer to floating point when generating
> > function calls
> >   * CALCITE-2530 
> > - TRIM function does not throw exception when the length of trim
> > character is not 1(one)
> >
> > More work:
> >
> >   * CALCITE-2404  -
> > Accessing structured-types is not implemented by the runtime
> >   * (none yet) - Support multi character TRIM extension in Calcite
> >
> > I would like to push these changes in with these minor regressions. Do
> > any of these Calcite bugs block this functionality being adding to Beam?
> >
> > Andrew
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Migrating Beam SQL to Calcite's code generation

2018-09-18 Thread Jean-Baptiste Onofré
Awesome,

thanks Andrew !!

Regards
JB

On 17/09/2018 23:27, Andrew Pilloud wrote:
> I've adapted Calcite's EnumerableCalc code generation to generate the
> BeamCalc DoFn. The primary purpose behind this change is so we can take
> advantage of Calcite's extensive SQL operator implementation. This
> deletes ~11000 lines of code from Beam (with ~350 added), significantly
> increases the set of supported SQL operators, and improves performance
> and correctness of currently supported operators. Here is my work in
> progress: https://github.com/apache/beam/pull/6417
> 
> There are a few bugs in Calcite that this has exposed:
> 
> Fixed in Calcite master:
> 
>   * CALCITE-2321 
> - The type of a union of CHAR columns of different lengths should be
> VARCHAR
>   * CALCITE-2447  -
> Some POWER, ATAN2 functions fail with NoSuchMethodException
> 
> Pending PRs:
> 
>   * CALCITE-2529 
> - linq4j should promote integer to floating point when generating
> function calls
>   * CALCITE-2530 
> - TRIM function does not throw exception when the length of trim
> character is not 1(one)
> 
> More work:
> 
>   * CALCITE-2404  -
> Accessing structured-types is not implemented by the runtime
>   * (none yet) - Support multi character TRIM extension in Calcite
> 
> I would like to push these changes in with these minor regressions. Do
> any of these Calcite bugs block this functionality being adding to Beam?
> 
> Andrew

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Migrating Beam SQL to Calcite's code generation

2018-09-18 Thread Lukasz Cwik
That seems pretty amazing.

On Mon, Sep 17, 2018 at 3:22 PM Mingmin Xu  wrote:

> Awesome work, we should call Calcite operator functions if available.
>
> I haven't get time to read the PR yet, for those impacted would keep
> existing implementation. One example is, I notice FLOOR/CEIL only supports
> months/years recently which is quite a surprise to me.
>
> Mingmin
>
> On Mon, Sep 17, 2018 at 3:03 PM Anton Kedin  wrote:
>
>> This is pretty amazing! Thank you for doing this!
>>
>> Regards,
>> Anton
>>
>> On Mon, Sep 17, 2018 at 2:27 PM Andrew Pilloud 
>> wrote:
>>
>>> I've adapted Calcite's EnumerableCalc code generation to generate the
>>> BeamCalc DoFn. The primary purpose behind this change is so we can take
>>> advantage of Calcite's extensive SQL operator implementation. This deletes
>>> ~11000 lines of code from Beam (with ~350 added), significantly increases
>>> the set of supported SQL operators, and improves performance and
>>> correctness of currently supported operators. Here is my work in progress:
>>> https://github.com/apache/beam/pull/6417
>>>
>>> There are a few bugs in Calcite that this has exposed:
>>>
>>> Fixed in Calcite master:
>>>
>>>- CALCITE-2321 
>>>- The type of a union of CHAR columns of different lengths should be 
>>> VARCHAR
>>>- CALCITE-2447 
>>>- Some POWER, ATAN2 functions fail with NoSuchMethodException
>>>
>>> Pending PRs:
>>>
>>>- CALCITE-2529 
>>>- linq4j should promote integer to floating point when generating 
>>> function
>>>calls
>>>- CALCITE-2530 
>>>- TRIM function does not throw exception when the length of trim 
>>> character
>>>is not 1(one)
>>>
>>> More work:
>>>
>>>- CALCITE-2404 
>>>- Accessing structured-types is not implemented by the runtime
>>>- (none yet) - Support multi character TRIM extension in Calcite
>>>
>>> I would like to push these changes in with these minor regressions. Do
>>> any of these Calcite bugs block this functionality being adding to Beam?
>>>
>>> Andrew
>>>
>>
>
> --
> 
> Mingmin
>


Re: Proposal for Beam Python User State and Timer APIs

2018-09-18 Thread Charles Chen
An update: the reference DirectRunner implementation of (and common
execution code for) the Python user state and timers API has been merged:
https://github.com/apache/beam/pull/6304

On Thu, Aug 30, 2018 at 1:48 AM Charles Chen  wrote:

> Another update: the reference DirectRunner implementation of the Python
> user state and timers API is out for review:
> https://github.com/apache/beam/pull/6304
>
> On Mon, Jul 9, 2018 at 2:18 PM Charles Chen  wrote:
>
>> An update: https://github.com/apache/beam/pull/5691 has been merged.  I
>> hope to send out a reference implementation in the DirectRunner soon.  On
>> the roadmap after that is work on the relevant portability interfaces here
>> so we can get this working on runners like Beam Python on Flink.
>>
>> On Wed, Jun 20, 2018 at 10:00 AM Charles Chen  wrote:
>>
>>> An update on the implementation: I recently sent out the user-facing
>>> pipeline construction part of the API implementation out for review:
>>> https://github.com/apache/beam/pull/5691.
>>>
>>> On Tue, Jun 5, 2018 at 5:26 PM Charles Chen  wrote:
>>>
 Thanks everyone for contributing here.  We've reached rough consensus
 on the approach we should take with this API, and I've summarized this in
 the new "Community consensus" sections I added to the doc (
 https://s.apache.org/beam-python-user-state-and-timers).  I will begin
 initial implementation of this API soon.

 On Wed, May 23, 2018 at 8:08 PM Thomas Weise  wrote:

> Nice proposal; it's exciting to see this about to be added to the SDK
> as it enables a set of more complex use cases.
>
> I also think that some of the content can later be repurposed as user
> documentation.
>
> Thanks,
> Thomas
>
>
> On Wed, May 23, 2018 at 11:49 AM, Charles Chen  wrote:
>
>> Thanks everyone for the detailed comments and discussions.  It looks
>> like by now, we mostly agree with the requirements and overall direction
>> needed for the API, though there is continuing discussion on specific
>> details.  I want to highlight two new sections of the doc, which address
>> some discussions that have come up:
>>
>>- *Existing state and transactionality*: this section addresses
>>how we will address an existing transactionality inconsistency in the
>>existing Java API.  (
>>
>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ofyl9jspiz3b
>>)
>>- *State for merging windows*: this section addresses how we will
>>deal with non-combinable state in conjunction with merging windows.  (
>>
>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ctxkcgabtzpy
>>)
>>
>> Let me know any further comments and suggestions.
>>
>> On Tue, May 22, 2018 at 9:29 AM Kenneth Knowles 
>> wrote:
>>
>>> Nice. I know that Java users have found it helpful to have this
>>> lower-level way of writing pipelines when the high-level primitives 
>>> don't
>>> quite have the tight control they are looking for. I hope it will be a 
>>> big
>>> draw for Python, too.
>>>
>>> (commenting on the doc)
>>>
>>> Kenn
>>>
>>> On Mon, May 21, 2018 at 5:15 PM Charles Chen  wrote:
>>>
 I want to share a proposal for adding user state and timer support
 to the Beam Python SDK and get the community's thoughts on how such an 
 API
 should look: https://s.apache.org/beam-python-user-state-and-timers

 Let me know what you think and please add any comments and
 suggestions you may have.

 Best,
 Charles

>>>
>


Re: Beam python sdk with gevent

2018-09-18 Thread Lukasz Cwik
I don't think anyone has tried what your doing. The code that your working
with is very new.

On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde  wrote:

> Hi all,
>
> We're using the Python SDK with the portable Flink runner and running into
> some problems integrating gevent. We're patching the gRPC runtime for
> gevent as described in [0] which allows pipelines to start and partially
> run. However the tasks produce a stream of gevent exceptions:
>
> Exception greenlet.error: error('cannot switch to a different thread',) in
> 'grpc._cython.cygrpc.run_loop' ignored
> Traceback (most recent call last):
>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>   File "src/gevent/event.py", line 140, in
> gevent._event._AbstractLinkable._wait
>   File "src/gevent/event.py", line 117, in
> gevent._event._AbstractLinkable._wait_core
>   File "src/gevent/event.py", line 119, in
> gevent._event._AbstractLinkable._wait_core
>   File "src/gevent/_greenlet_primitives.py", line 59, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/_greenlet_primitives.py", line 59, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/_greenlet_primitives.py", line 63, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
> gevent.__greenlet_primitives._greenlet_switch
> greenlet.error: cannot switch to a different thread
>
> and do not make any progress.
>
> Has anybody else successfully used the portable python sdk with gevent? Or
> is there a recommended alternative for doing async IO in python pipelines?
>
> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>
> Micah
>


Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-18 Thread Jean-Baptiste Onofré
Thanks, let me take a look.

Regards
JB

On 18/09/2018 17:36, Romain Manni-Bucau wrote:
> 
> 
> 
> Le mar. 18 sept. 2018 à 16:44, Jean-Baptiste Onofré  > a écrit :
> 
> Hi,
> 
> I don't have the issue ;)
> 
> As said in my vote, I tested 2.7.0 RC1 on beam-samples with Spark
> without problem.
> 
> I don't reproduce Romain issue as well.
> 
> @Romain can you provide some details to reproduce the issue ?
> 
> 
> Sure, you can use this
> reproducer: https://github.com/rmannibucau/beam-2.7.0-fails
> It shows that it suceeds on 2.6 and fails on 2.7.
>  
> 
> 
> Regards
> JB
> 
> On 17/09/2018 19:17, Charles Chen wrote:
> > Luke, Maximillian, Raghu, can you please propose cherry-pick PRs
> to the
> > release-2.7.0 for your issues and add me as a reviewer
> (@charlesccychen)?
> >
> > Romain, JB: is there any way I can help with debugging the issue
> you're
> > facing so we can unblock the release?
> >
> > On Fri, Sep 14, 2018 at 1:49 PM Raghu Angadi  
> > >> wrote:
> >
> >     I would like propose one more cherrypick for RC2
> >     : https://github.com/apache/beam/pull/6391
> >     This is a KafkaIO bug fix. Once a user hits this bug, there is no
> >     easy work around for them, especially on Dataflow. Only work
> around
> >     in Dataflow is to restart or reload the job.
> >
> >     The fix itself fairly safe and is tested.
> >     Raghu.
> >
> >     On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko
> >     mailto:aromanenko@gmail.com>
> >>
> wrote:
> >
> >         Perhaps it could help, but I run simple WordCount (built with
> >         Beam 2.7) on YARN/Spark (HDP Sandbox) cluster and it
> worked fine
> >         for me.
> >
> >>         On 14 Sep 2018, at 06:56, Romain Manni-Bucau
> >>         mailto:rmannibu...@gmail.com>
> >> wrote:
> >>
> >>         Hi Charles,
> >>
> >>         I didn't get enough time to check deeply but it is clearly a
> >>         dependency issue and it is not in beam spark runner
> itself but
> >>         in another transitive module of beam. It does not happen in
> >>         existing spark test cause none of them are in a cluster (even
> >>         just with 1 worker) but this seems to be a regression since
> >>         2.6 works OOTB.
> >>
> >>         Romain Manni-Bucau
> >>         @rmannibucau  |  Blog
> >>          | Old Blog
> >>          | Github
> >>          | LinkedIn
> >>          | Book
> >>       
>  
> 
> >>
> >>
> >>         Le jeu. 13 sept. 2018 à 22:15, Charles Chen
> mailto:c...@google.com>
> >>         >> a écrit :
> >>
> >>             Romain and JB, can you please add the results of your
> >>             investigations into the errors you've seen above?  Given
> >>             that the existing SparkRunner tests pass for this RC, and
> >>             that the integration test you ran is in another repo that
> >>             is not continuously tested with Beam, it is not clear how
> >>             we should move forward and whether this is a blocking
> >>             issue, unless we can find a root cause in Beam.
> >>
> >>             On Wed, Sep 12, 2018 at 2:08 AM Etienne Chauchot
> >>             mailto:echauc...@apache.org>
> >> wrote:
> >>
> >>                 Hi all,
> >>
> >>                 on a performance and functional regression stand
> point
> >>                 I see no regression:
> >>
> >>                 I looked at nexmark graphs "output pcollection size"
> >>                 and "execution time" around release cut date on
> >>                 dataflow, spark, flink and direct runner in batch and
> >>                 streaming modes. There seems to be no regression.
> >>
> >>                 Etienne
> >>
> >>                 Le mardi 11 septembre 2018 à 12:25 -0700, Charles
> Chen
> >>                 a écrit :
> >>>                 The SparkRunner validation test
> >>>               
>  (here: 
> https://beam.apache.org/contribute/release-guide/#run-validation-tests)
> >>>                 passes on my machine.  It looks like we are likely
> >>>     

Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-18 Thread Romain Manni-Bucau
Le mar. 18 sept. 2018 à 16:44, Jean-Baptiste Onofré  a
écrit :

> Hi,
>
> I don't have the issue ;)
>
> As said in my vote, I tested 2.7.0 RC1 on beam-samples with Spark
> without problem.
>
> I don't reproduce Romain issue as well.
>
> @Romain can you provide some details to reproduce the issue ?
>

Sure, you can use this reproducer:
https://github.com/rmannibucau/beam-2.7.0-fails
It shows that it suceeds on 2.6 and fails on 2.7.


>
> Regards
> JB
>
> On 17/09/2018 19:17, Charles Chen wrote:
> > Luke, Maximillian, Raghu, can you please propose cherry-pick PRs to the
> > release-2.7.0 for your issues and add me as a reviewer (@charlesccychen)?
> >
> > Romain, JB: is there any way I can help with debugging the issue you're
> > facing so we can unblock the release?
> >
> > On Fri, Sep 14, 2018 at 1:49 PM Raghu Angadi  > > wrote:
> >
> > I would like propose one more cherrypick for RC2
> > : https://github.com/apache/beam/pull/6391
> > This is a KafkaIO bug fix. Once a user hits this bug, there is no
> > easy work around for them, especially on Dataflow. Only work around
> > in Dataflow is to restart or reload the job.
> >
> > The fix itself fairly safe and is tested.
> > Raghu.
> >
> > On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko
> > mailto:aromanenko@gmail.com>> wrote:
> >
> > Perhaps it could help, but I run simple WordCount (built with
> > Beam 2.7) on YARN/Spark (HDP Sandbox) cluster and it worked fine
> > for me.
> >
> >> On 14 Sep 2018, at 06:56, Romain Manni-Bucau
> >> mailto:rmannibu...@gmail.com>> wrote:
> >>
> >> Hi Charles,
> >>
> >> I didn't get enough time to check deeply but it is clearly a
> >> dependency issue and it is not in beam spark runner itself but
> >> in another transitive module of beam. It does not happen in
> >> existing spark test cause none of them are in a cluster (even
> >> just with 1 worker) but this seems to be a regression since
> >> 2.6 works OOTB.
> >>
> >> Romain Manni-Bucau
> >> @rmannibucau  |  Blog
> >>  | Old Blog
> >>  | Github
> >>  | LinkedIn
> >>  | Book
> >> <
> https://www.packtpub.com/application-development/java-ee-8-high-performance
> >
> >>
> >>
> >> Le jeu. 13 sept. 2018 à 22:15, Charles Chen  >> > a écrit :
> >>
> >> Romain and JB, can you please add the results of your
> >> investigations into the errors you've seen above?  Given
> >> that the existing SparkRunner tests pass for this RC, and
> >> that the integration test you ran is in another repo that
> >> is not continuously tested with Beam, it is not clear how
> >> we should move forward and whether this is a blocking
> >> issue, unless we can find a root cause in Beam.
> >>
> >> On Wed, Sep 12, 2018 at 2:08 AM Etienne Chauchot
> >> mailto:echauc...@apache.org>> wrote:
> >>
> >> Hi all,
> >>
> >> on a performance and functional regression stand point
> >> I see no regression:
> >>
> >> I looked at nexmark graphs "output pcollection size"
> >> and "execution time" around release cut date on
> >> dataflow, spark, flink and direct runner in batch and
> >> streaming modes. There seems to be no regression.
> >>
> >> Etienne
> >>
> >> Le mardi 11 septembre 2018 à 12:25 -0700, Charles Chen
> >> a écrit :
> >>> The SparkRunner validation test
> >>> (here:
> https://beam.apache.org/contribute/release-guide/#run-validation-tests)
> >>> passes on my machine.  It looks like we are likely
> >>> missing test coverage where Romain is hitting issues.
> >>>
> >>> On Tue, Sep 11, 2018 at 12:15 PM Ahmet Altay
> >>> mailto:al...@google.com>> wrote:
>  Could anyone else help with looking at these issues
>  earlier?
> 
>  On Tue, Sep 11, 2018 at 12:03 PM, Romain Manni-Bucau
>    > wrote:
> > Im running this main [1] through this IT [2]. Was
> > working fine since ~1 year but 2.7.0 broke it.
> > Didnt investigate more but can have a look later
> > this month if it helps.
> >
> > [1]
> https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/sr

Re: [Discuss] Upgrade story for Beam's execution engines

2018-09-18 Thread Maximilian Michels
FYI, I opened a PR with a compatibility table for the Flink Runner page: 
https://github.com/apache/beam-site/pull/553


On 17.09.18 09:31, Robert Bradshaw wrote:
On Mon, Sep 17, 2018 at 2:02 AM Austin Bennett 
mailto:whatwouldausti...@gmail.com>> wrote:


Do we currently maintain a finer grained list of compatibility
between execution/runner versions and beam versions?  Is this only
really a concern with recent Flink (sounded like at least Spark
jump, too)?  I see the capability matrix:
https://beam.apache.org/documentation/runners/capability-matrix/,
but some sort of compatibility between runner versions with beam
releases might be useful.

I see compatibility matrix as far as beam features, but not for
underlying runners.  Ex: something like this would save a user
trying to get Beam working on recent Flink 1.6 and then subsequently
hitting a (potentially not well documented) wall given known issues. 



+1. I was bitten by this as well.

I don't know if it's worth having a compatibility matrix for each 
version (as the overlap is likely to be all or nothing in most cases), 
but it should be prominently displayed here and elsewhere. Want to send 
out a PR?


Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-18 Thread Maximilian Michels

Hi Charles,

Thanks for driving the release. Here are the cherrypicks I mentioned: 
https://github.com/apache/beam/pull/6427


Thanks,
Max

On 17.09.18 19:17, Charles Chen wrote:
Luke, Maximillian, Raghu, can you please propose cherry-pick PRs to the 
release-2.7.0 for your issues and add me as a reviewer (@charlesccychen)?


Romain, JB: is there any way I can help with debugging the issue you're 
facing so we can unblock the release?


On Fri, Sep 14, 2018 at 1:49 PM Raghu Angadi > wrote:


I would like propose one more cherrypick for RC2 :
https://github.com/apache/beam/pull/6391
This is a KafkaIO bug fix. Once a user hits this bug, there is no
easy work around for them, especially on Dataflow. Only work around
in Dataflow is to restart or reload the job.

The fix itself fairly safe and is tested.
Raghu.

On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko
mailto:aromanenko@gmail.com>> wrote:

Perhaps it could help, but I run simple WordCount (built with
Beam 2.7) on YARN/Spark (HDP Sandbox) cluster and it worked fine
for me.


On 14 Sep 2018, at 06:56, Romain Manni-Bucau
mailto:rmannibu...@gmail.com>> wrote:

Hi Charles,

I didn't get enough time to check deeply but it is clearly a
dependency issue and it is not in beam spark runner itself but
in another transitive module of beam. It does not happen in
existing spark test cause none of them are in a cluster (even
just with 1 worker) but this seems to be a regression since
2.6 works OOTB.

Romain Manni-Bucau
@rmannibucau  | Blog
 | Old Blog
 | Github
 | LinkedIn
 | Book




Le jeu. 13 sept. 2018 à 22:15, Charles Chen mailto:c...@google.com>> a écrit :

Romain and JB, can you please add the results of your
investigations into the errors you've seen above?  Given
that the existing SparkRunner tests pass for this RC, and
that the integration test you ran is in another repo that
is not continuously tested with Beam, it is not clear how
we should move forward and whether this is a blocking
issue, unless we can find a root cause in Beam.

On Wed, Sep 12, 2018 at 2:08 AM Etienne Chauchot
mailto:echauc...@apache.org>> wrote:

Hi all,

on a performance and functional regression stand point
I see no regression:

I looked at nexmark graphs "output pcollection size"
and "execution time" around release cut date on
dataflow, spark, flink and direct runner in batch and
streaming modes. There seems to be no regression.

Etienne

Le mardi 11 septembre 2018 à 12:25 -0700, Charles Chen
a écrit :

The SparkRunner validation test (here:

https://beam.apache.org/contribute/release-guide/#run-validation-tests)
passes on my machine.  It looks like we are likely
missing test coverage where Romain is hitting issues.

On Tue, Sep 11, 2018 at 12:15 PM Ahmet Altay
mailto:al...@google.com>> wrote:

Could anyone else help with looking at these issues
earlier?

On Tue, Sep 11, 2018 at 12:03 PM, Romain Manni-Bucau
mailto:rmannibu...@gmail.com>> wrote:

Im running this main [1] through this IT [2]. Was
working fine since ~1 year but 2.7.0 broke it.
Didnt investigate more but can have a look later
this month if it helps.

[1]

https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/main/java/org/talend/sdk/component/beam/it/clusterserialization/Main.java
[2]

https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/test/java/org/talend/sdk/component/beam/it/SerializationOverClusterIT.java

Le mar. 11 sept. 2018 20:54, Charles Chen
mailto:c...@google.com>> a écrit :

Romain: can you give more details on the failure
you're encountering, i.e. how you are performing
this validation?

On Tue, Sep 11, 2018 at 9:36 AM Jean-Baptiste
Onofré mailto:j...@nanthrax.net>>
wrote:

Hi,

weird, I didn't have it on 

Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-18 Thread Jean-Baptiste Onofré
Hi,

I don't have the issue ;)

As said in my vote, I tested 2.7.0 RC1 on beam-samples with Spark
without problem.

I don't reproduce Romain issue as well.

@Romain can you provide some details to reproduce the issue ?

Regards
JB

On 17/09/2018 19:17, Charles Chen wrote:
> Luke, Maximillian, Raghu, can you please propose cherry-pick PRs to the
> release-2.7.0 for your issues and add me as a reviewer (@charlesccychen)?
> 
> Romain, JB: is there any way I can help with debugging the issue you're
> facing so we can unblock the release?
> 
> On Fri, Sep 14, 2018 at 1:49 PM Raghu Angadi  > wrote:
> 
> I would like propose one more cherrypick for RC2
> : https://github.com/apache/beam/pull/6391
> This is a KafkaIO bug fix. Once a user hits this bug, there is no
> easy work around for them, especially on Dataflow. Only work around
> in Dataflow is to restart or reload the job.
> 
> The fix itself fairly safe and is tested.
> Raghu.
> 
> On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko
> mailto:aromanenko@gmail.com>> wrote:
> 
> Perhaps it could help, but I run simple WordCount (built with
> Beam 2.7) on YARN/Spark (HDP Sandbox) cluster and it worked fine
> for me.
> 
>> On 14 Sep 2018, at 06:56, Romain Manni-Bucau
>> mailto:rmannibu...@gmail.com>> wrote:
>>
>> Hi Charles,
>>
>> I didn't get enough time to check deeply but it is clearly a
>> dependency issue and it is not in beam spark runner itself but
>> in another transitive module of beam. It does not happen in
>> existing spark test cause none of them are in a cluster (even
>> just with 1 worker) but this seems to be a regression since
>> 2.6 works OOTB.
>>
>> Romain Manni-Bucau
>> @rmannibucau  |  Blog
>>  | Old Blog
>>  | Github
>>  | LinkedIn
>>  | Book
>> 
>> 
>>
>>
>> Le jeu. 13 sept. 2018 à 22:15, Charles Chen > > a écrit :
>>
>> Romain and JB, can you please add the results of your
>> investigations into the errors you've seen above?  Given
>> that the existing SparkRunner tests pass for this RC, and
>> that the integration test you ran is in another repo that
>> is not continuously tested with Beam, it is not clear how
>> we should move forward and whether this is a blocking
>> issue, unless we can find a root cause in Beam.
>>
>> On Wed, Sep 12, 2018 at 2:08 AM Etienne Chauchot
>> mailto:echauc...@apache.org>> wrote:
>>
>> Hi all,
>>
>> on a performance and functional regression stand point
>> I see no regression:
>>
>> I looked at nexmark graphs "output pcollection size"
>> and "execution time" around release cut date on
>> dataflow, spark, flink and direct runner in batch and
>> streaming modes. There seems to be no regression.
>>
>> Etienne
>>
>> Le mardi 11 septembre 2018 à 12:25 -0700, Charles Chen
>> a écrit :
>>> The SparkRunner validation test
>>> (here: 
>>> https://beam.apache.org/contribute/release-guide/#run-validation-tests)
>>> passes on my machine.  It looks like we are likely
>>> missing test coverage where Romain is hitting issues.
>>>
>>> On Tue, Sep 11, 2018 at 12:15 PM Ahmet Altay
>>> mailto:al...@google.com>> wrote:
 Could anyone else help with looking at these issues
 earlier?

 On Tue, Sep 11, 2018 at 12:03 PM, Romain Manni-Bucau
 >>> > wrote:
> Im running this main [1] through this IT [2]. Was
> working fine since ~1 year but 2.7.0 broke it.
> Didnt investigate more but can have a look later
> this month if it helps.
>
> [1] 
> https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/main/java/org/talend/sdk/component/beam/it/clusterserialization/Main.java
> [2] 
> https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/test/java/org/talend/sdk/component/beam/it/SerializationOverClusterIT.java
>
> Le mar. 11 sept. 2018 20:54, Charles Chen
> mailto

Re: How to optimize the performance of Beam on Spark

2018-09-18 Thread Jean-Baptiste Onofré
Hi,

The first huge difference is the fact that the spark runner still uses
RDD whereas directly using spark, you are using dataset. A bunch of
optimization in spark are related to dataset.

I started a large refactoring of the spark runner to leverage Spark 2.x
(and dataset).
It's not yet ready as it includes other improvements (the portability
layer with Job API, a first check of state API, ...).

Anyway, by Spark wordcount, you mean the one included in the spark
distribution ?

Regards
JB

On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> Hi,
>     I'm testing Beam on Spark. 
>     I use spark example code WordCount processing 1G data file, cost 1
> minutes.
>     However, I use Beam example code WordCount processing the same file,
> cost 30minutes.
>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
> --num-executors 1 --driver-memory 1g
>     My Spark version is 2.3.1,  Beam version is 2.5
>     Is there any optimization method?
> Thank you.
> 
>    

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: How to optimize the performance of Beam on Spark

2018-09-18 Thread Tim Robertson
Hi devinduan

The known issues Robert links there are actually HDFS related and not
specific to Spark.  The improvement we're seeking is that the final copy of
the output file can be optimised by using a "move" instead of "copy" andI
expect to have it fixed for Beam 2.8.0.  On a small dataset like this
though, I don't think it will impact performance too much.

Can you please elaborate on your deployment?  It looks like you are using a
cluster (i.e. deploy-mode client) but are you using HDFS?

I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an example
word count as follows - I'll explain the parameters to tune below:

1) I generate some random data (using common Hadoop tools)
hadoop jar
/opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \
  teragen \
  -Dmapred.map.tasks=100 \
  -Dmapred.map.tasks.speculative.execution=false \
  1000  \
  /tmp/tera

This puts 100 files totalling just under 1GB on which I will run the word
count. They are stored in the HDFS filesystem.

2) Run the word count using Spark (2.3.x) and Beam 2.5.0

In my cluster I have YARN to allocate resources, and an HDFS filesystem.
This will be different if you run Spark as standalone, or on a cloud
environment.

spark2-submit \
  --conf spark.default.parallelism=45 \
  --class org.apache.beam.runners.spark.examples.WordCount \
  --master yarn \
  --executor-memory 2G \
  --executor-cores 5 \
  --num-executors 9 \
  --jars
beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar
\
  beam-runners-spark-2.5.0.jar \
  --runner=SparkRunner \
  --inputFile=hdfs:///tmp/tera/* \
  --output=hdfs:///tmp/wordcount

The jars I provide here are the minimum needed for running on HDFS with
Spark and normally you'd build those into your project as an über jar.

The important bits for tuning for performance are the following - these
will be applicable for any Spark deployment (unless embedded):

  spark.default.parallelism - controls the parallelism of the beam
pipeline. In this case, how many workers are tokenizing the input data.
  executor-memory, executor-cores, num-executors - controls the resources
spark will use

Note, that the parallelism of 45 means that the 5 cores in the 9 executors
can all run concurrently (i.e. 5x9 = 45). When you get to very large
datasets, you will likely have parallelism much higher.

In this test I see around 20 seconds initial startup of Spark (copying
jars, requesting resources from YARN, establishing the Spark context) but
once up the job completes in a few seconds writing the output into 45 files
(because of the parallelism). The files are named
/tmp/wordcount-000*-of-00045.

I hope this helps provide a few pointers, but if you elaborate on your
environment we might be able to assist more.

Best wishes,
Tim













On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw  wrote:

> There are known performance issues with Beam on Spark that are being
> worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's
> possible you're hitting something different, but would be worth
> investigating. See also
> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write
>
> On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) 
> wrote:
>
>> Hi,
>> I'm testing Beam on Spark.
>> I use spark example code WordCount processing 1G data file, cost 1
>> minutes.
>> However, I use Beam example code WordCount processing the same file,
>> cost 30minutes.
>> My Spark parameter is :  --deploy-mode client  --executor-memory 1g
>> --num-executors 1 --driver-memory 1g
>> My Spark version is 2.3.1,  Beam version is 2.5
>> Is there any optimization method?
>> Thank you.
>>
>>
>>
>


Re: How to optimize the performance of Beam on Spark

2018-09-18 Thread Robert Bradshaw
There are known performance issues with Beam on Spark that are being worked
on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's possible
you're hitting something different, but would be worth investigating. See
also
https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write

On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) 
wrote:

> Hi,
> I'm testing Beam on Spark.
> I use spark example code WordCount processing 1G data file, cost 1
> minutes.
> However, I use Beam example code WordCount processing the same file,
> cost 30minutes.
> My Spark parameter is :  --deploy-mode client  --executor-memory 1g
> --num-executors 1 --driver-memory 1g
> My Spark version is 2.3.1,  Beam version is 2.5
> Is there any optimization method?
> Thank you.
>
>
>