Re: [VOTE] Release 2.2.0, release candidate #3

2017-11-15 Thread Reuven Lax
This is with the CP of Eugene's PR. However Eugene's PR does not touch
anything Python.

On Thu, Nov 16, 2017 at 10:10 AM, Reuven Lax  wrote:

>
> mvn -Prelease clean install
>
>
> [*INFO*] *--- *exec-maven-plugin:1.5.0:exec *(setuptools-clean)* @
> beam-sdks-python* ---*
>
> Could not find platform independent libraries 
>
> Could not find platform dependent libraries 
>
> Consider setting $PYTHONHOME to [:]
>
> ImportError: No module named site
>
> [*ERROR*] Command execution failed.
>
> org.apache.commons.exec.ExecuteException: Process exited with an error: 1
> (Exit value: 1)
>
> at org.apache.commons.exec.DefaultExecutor.executeInternal(
> DefaultExecutor.java:404)
>
> at org.apache.commons.exec.DefaultExecutor.execute(
> DefaultExecutor.java:166)
>
> at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764)
>
> at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711)
>
> at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289)
>
> at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(
> DefaultBuildPluginManager.java:134)
>
> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(
> MojoExecutor.java:208)
>
> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(
> MojoExecutor.java:154)
>
> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(
> MojoExecutor.java:146)
>
> at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.
> buildProject(LifecycleModuleBuilder.java:117)
>
> at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.
> buildProject(LifecycleModuleBuilder.java:81)
>
> at org.apache.maven.lifecycle.internal.builder.singlethreaded.
> SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
>
> at org.apache.maven.lifecycle.internal.LifecycleStarter.
> execute(LifecycleStarter.java:128)
>
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:309)
>
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:194)
>
> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:107)
>
> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:993)
>
> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:345)
>
> at org.apache.maven.cli.MavenCli.main(MavenCli.java:191)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.codehaus.plexus.classworlds.launcher.Launcher.
> launchEnhanced(Launcher.java:289)
>
> at org.codehaus.plexus.classworlds.launcher.Launcher.
> launch(Launcher.java:229)
>
> at org.codehaus.plexus.classworlds.launcher.Launcher.
> mainWithExitCode(Launcher.java:415)
>
> at org.codehaus.plexus.classworlds.launcher.Launcher.
> main(Launcher.java:356)
>
> On Thu, Nov 16, 2017 at 5:02 AM, Charles Chen 
> wrote:
>
>> Could you send the command you used that produced this error?  I can't
>> reproduce it at the tip of the release-2.2.0 branch.
>>
>> On Wed, Nov 15, 2017 at 5:34 AM Reuven Lax 
>> wrote:
>>
>> > I'm trying to do the last CP and cut RC4, but I'm getting a compilation
>> > failure in Python - "ImportError: No module named site"
>> >
>> > Did we possibly break the release branch on one of the Python CPs?
>> >
>> > Reuven
>> >
>> > On Sun, Nov 12, 2017 at 5:12 PM, Jean-Baptiste Onofré 
>> > wrote:
>> >
>> > > Hi Reuven,
>> > >
>> > > +1 for RC4, and don't worry: it's part of the process. I prefer to
>> have a
>> > > long release process than a crappy a release ;) That's exactly the
>> > purpose
>> > > of review & vote.
>> > >
>> > > I definitely think that having releases more often will reduce such
>> kind
>> > > of issue.
>> > >
>> > > Regards
>> > > JB
>> > >
>> > >
>> > > On 11/12/2017 09:04 AM, Reuven Lax wrote:
>> > >
>> > >> I definitely appreciate the frustration about how long this release
>> is
>> > >> taking. It's verging on the point of ridiculous at this point, and we
>> > need
>> > >> to fix some of the things that caused us to get to this state (for
>> one
>> > >> thing our infrastructure was so busted at one point, that Valentyn
>> > spent 2
>> > >> weeks trying to get on PR merged into the release branch).
>> > >>
>> > >> At this point, let's try and fix this Monday. Unfortunately this is
>> not
>> > >> the
>> > >> sole issue requiring RC4. Python verification failed as well, and we
>> > need
>> > >> an RC4 regardless to merge those PRs. I'm hoping that RC4 is our
>> final
>> > RC,
>> > >> and we can finish voting next week.
>> > >>
>> > >> Reuven
>> > >>
>> > >> On Sat, Nov 11, 2017 at 6:24 AM, Romain Manni-Bucau <
>> > >> rmannibu...@gmail.com>
>> > >> wrote:
>> > >>
>> > >> Le 11 nov. 2017 09:52, "Jean-Baptiste Onofré"  a
>> > écrit :
>> > >>>
>> > >>> If the purpose is to release 2.2.1 in one 

Re: [VOTE] Release 2.2.0, release candidate #3

2017-11-15 Thread Reuven Lax
mvn -Prelease clean install


[*INFO*] *--- *exec-maven-plugin:1.5.0:exec *(setuptools-clean)* @
beam-sdks-python* ---*

Could not find platform independent libraries 

Could not find platform dependent libraries 

Consider setting $PYTHONHOME to [:]

ImportError: No module named site

[*ERROR*] Command execution failed.

org.apache.commons.exec.ExecuteException: Process exited with an error: 1
(Exit value: 1)

at
org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404)

at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166)

at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764)

at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711)

at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289)

at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)

at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)

at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:154)

at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:146)

at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:117)

at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:81)

at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)

at
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)

at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:309)

at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:194)

at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:107)

at org.apache.maven.cli.MavenCli.execute(MavenCli.java:993)

at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:345)

at org.apache.maven.cli.MavenCli.main(MavenCli.java:191)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)

at
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)

at
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)

at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)

On Thu, Nov 16, 2017 at 5:02 AM, Charles Chen 
wrote:

> Could you send the command you used that produced this error?  I can't
> reproduce it at the tip of the release-2.2.0 branch.
>
> On Wed, Nov 15, 2017 at 5:34 AM Reuven Lax 
> wrote:
>
> > I'm trying to do the last CP and cut RC4, but I'm getting a compilation
> > failure in Python - "ImportError: No module named site"
> >
> > Did we possibly break the release branch on one of the Python CPs?
> >
> > Reuven
> >
> > On Sun, Nov 12, 2017 at 5:12 PM, Jean-Baptiste Onofré 
> > wrote:
> >
> > > Hi Reuven,
> > >
> > > +1 for RC4, and don't worry: it's part of the process. I prefer to
> have a
> > > long release process than a crappy a release ;) That's exactly the
> > purpose
> > > of review & vote.
> > >
> > > I definitely think that having releases more often will reduce such
> kind
> > > of issue.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 11/12/2017 09:04 AM, Reuven Lax wrote:
> > >
> > >> I definitely appreciate the frustration about how long this release is
> > >> taking. It's verging on the point of ridiculous at this point, and we
> > need
> > >> to fix some of the things that caused us to get to this state (for one
> > >> thing our infrastructure was so busted at one point, that Valentyn
> > spent 2
> > >> weeks trying to get on PR merged into the release branch).
> > >>
> > >> At this point, let's try and fix this Monday. Unfortunately this is
> not
> > >> the
> > >> sole issue requiring RC4. Python verification failed as well, and we
> > need
> > >> an RC4 regardless to merge those PRs. I'm hoping that RC4 is our final
> > RC,
> > >> and we can finish voting next week.
> > >>
> > >> Reuven
> > >>
> > >> On Sat, Nov 11, 2017 at 6:24 AM, Romain Manni-Bucau <
> > >> rmannibu...@gmail.com>
> > >> wrote:
> > >>
> > >> Le 11 nov. 2017 09:52, "Jean-Baptiste Onofré"  a
> > écrit :
> > >>>
> > >>> If the purpose is to release 2.2.1 in one week, why not just to a
> RC4 ?
> > >>>
> > >>> It's not a regression because WriteFiles is new and extend the
> previous
> > >>> FileSource. So it could consider as a severe bug, especially on
> > >>> WriteFiles
> > >>> which is important.
> > >>>
> > >>>
> > >>> Fair enough.
> > >>>
> > >>>
> > >>> The core issue is the time we spent already on this release: roughly
> 1
> > >>> month !!! It's 

Re: [VOTE] Release 2.2.0, release candidate #3

2017-11-15 Thread Charles Chen
Could you send the command you used that produced this error?  I can't
reproduce it at the tip of the release-2.2.0 branch.

On Wed, Nov 15, 2017 at 5:34 AM Reuven Lax  wrote:

> I'm trying to do the last CP and cut RC4, but I'm getting a compilation
> failure in Python - "ImportError: No module named site"
>
> Did we possibly break the release branch on one of the Python CPs?
>
> Reuven
>
> On Sun, Nov 12, 2017 at 5:12 PM, Jean-Baptiste Onofré 
> wrote:
>
> > Hi Reuven,
> >
> > +1 for RC4, and don't worry: it's part of the process. I prefer to have a
> > long release process than a crappy a release ;) That's exactly the
> purpose
> > of review & vote.
> >
> > I definitely think that having releases more often will reduce such kind
> > of issue.
> >
> > Regards
> > JB
> >
> >
> > On 11/12/2017 09:04 AM, Reuven Lax wrote:
> >
> >> I definitely appreciate the frustration about how long this release is
> >> taking. It's verging on the point of ridiculous at this point, and we
> need
> >> to fix some of the things that caused us to get to this state (for one
> >> thing our infrastructure was so busted at one point, that Valentyn
> spent 2
> >> weeks trying to get on PR merged into the release branch).
> >>
> >> At this point, let's try and fix this Monday. Unfortunately this is not
> >> the
> >> sole issue requiring RC4. Python verification failed as well, and we
> need
> >> an RC4 regardless to merge those PRs. I'm hoping that RC4 is our final
> RC,
> >> and we can finish voting next week.
> >>
> >> Reuven
> >>
> >> On Sat, Nov 11, 2017 at 6:24 AM, Romain Manni-Bucau <
> >> rmannibu...@gmail.com>
> >> wrote:
> >>
> >> Le 11 nov. 2017 09:52, "Jean-Baptiste Onofré"  a
> écrit :
> >>>
> >>> If the purpose is to release 2.2.1 in one week, why not just to a RC4 ?
> >>>
> >>> It's not a regression because WriteFiles is new and extend the previous
> >>> FileSource. So it could consider as a severe bug, especially on
> >>> WriteFiles
> >>> which is important.
> >>>
> >>>
> >>> Fair enough.
> >>>
> >>>
> >>> The core issue is the time we spent already on this release: roughly 1
> >>> month !!! It's clearly too long due to different causes.
> >>> When I did the previous releases, it took 3 or 4 days. It's clearly the
> >>> target as, as said, I would like to have a release pace of a release
> >>> every
> >>> 6 weeks.
> >>>
> >>>
> >>>
> >>> Agree and this is why 2.2.0 must be out now IMHO. If you are confident
> >>> next
> >>> week is sufficient just go ahead and ignore my comment but my point was
> >>> the
> >>> same: it shouldnt last so long if there is no regression :(.
> >>>
> >>>
> >>>
> >>> Regards
> >>> JB
> >>>
> >>>
> >>> On 11/11/2017 08:41 AM, Romain Manni-Bucau wrote:
> >>>
> >>> You can see it differently: is there a critical bug? Yes! Is there a
>  regression? No!
> 
>  So no need to wait another week (keep in mind 2 days + 3 days of vote
>  makes easily 1 working week). This vote could be closed already and
> next
>  week 2.2.1 could fix this bug, no? Overall idea is to not hold the
>  community more than needed if there is no regression compared to last
>  few
>  releases.
> 
>  Le 11 nov. 2017 07:46, "Jean-Baptiste Onofré"  a
> écrit
> 
> >>> :
> >>>
> 
>  -1 (binding)
> 
> >
> > I agree with Eugene, data loss is severe.
> >
> > As Eugene seems confident to fix that quickly, I think it's worth to
> >
>  cut a
> >>>
>  RC4.
> >
> > However, I would introduce a deadline. As I would like to propose a
> > release cycle of a release every 6 weeks (whatever it contains, but
> it
> > really important to keep  a regular pace in releases), a release
> should
> >
>  be
> >>>
>  cut in couple of days. So, maybe we can give us 2 business days to fix
> > that
> > and propose a RC4. Basically, if this issue is not fix on Tuesday
> > night,
> > then, we move forward anyway.
> >
> > Regards
> > JB
> >
> > On 11/10/2017 07:42 PM, Eugene Kirpichov wrote:
> >
> > Unfortunately I think I found a data loss bug - it was there since
> > 2.0.0
> >
> >> but I think it's serious enough that delaying a fix until the next
> >> release
> >> would be irresponsible.
> >> See https://issues.apache.org/jira/browse/BEAM-3169
> >>
> >> On Thu, Nov 9, 2017 at 3:57 PM Robert Bradshaw
> >> 
> >> wrote:
> >>
> >> Our release notes look like nothing more than a query for the closed
> >>
> >> jira issues. Do we have a top-level summary to highlight the big
> >>> ticket items in the release? And in particular somewhere to mention
> >>> that this is likely the last release to support Java 7 that'll get
> >>> widely read?
> >>>
> >>> On Thu, Nov 9, 2017 at 3:39 PM, Reuven Lax
>  

Re: [PROPOSAL] "Requires deterministic input"

2017-11-15 Thread Aljoscha Krettek
+1

> On 15. Nov 2017, at 14:07, Jean-Baptiste Onofré  wrote:
> 
> Agree !
> 
> Thanks Kenn,
> Regards
> JB
> 
> On 11/15/2017 02:05 PM, Kenneth Knowles wrote:
>> Reviving this again, since it came up again today in yet another context. I
>> think it is time to add this as an experimental annotation. I think we know
>> that we need it, and roughly how it should work, while there are still
>> finer points to discuss about what it means for input to be stable.
>> So I filed https://issues.apache.org/jira/browse/BEAM-3194 and whipped up
>> https://github.com/apache/beam/pull/4135 to move it along a smidge. It will
>> need to be incorporated into the Beam model's ParDoPayload and the Python
>> SDK as well.
>> Kenn
>> On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax 
>> wrote:
>>> Well the Fn API is still being designed, so this is something we'd have to
>>> think about.
>>> 
>>> On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
>>> rober...@google.com.invalid> wrote:
>>> 
 On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax 
 wrote:
> On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
> 
>> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax > wrote:
>>> On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
 The question here is whether the ordering is part of the "content"
>>> of
 an iterable.
>>> 
>>> My initial instinct was to say yes - but maybe it should not be
>>> until
>> Beam
>>> has a first-class notion of sorted values after a GBK?
>> 
>> Yeah, I'm not sure on this either. Interestingly, if we consider
>> ordering to be important, than the composite gbk + ungroup will be
>> stable despite its components not being so.
>> 
>> As I mention above, the iterable is semantically [part of] a
 single
>> element. So just to unpack this, to make sure we are talking
>>> about
>> the
 same
>> thing, I think you are talking about GBK as implemented via
>>> GBKO +
>> GABW.
>> 
>> When the output of GABW is required to be stable but the output
>>> of
>> GBKO
 is
>> not stable, we don't have stability for free in all cases by
>> inserting a
>> GBK, but require something more to make the output of GABW
 stable, in
 the
>> worst case a full materialization.
>> 
> 
> Correct. My point is that there are alternate, cheaper ways of
 doing
 this.
> If GABW stores state in an ordered list, it can simply
>>> checkpoint a
 market
> into that list to ensure that the output is stabl.
 
 In the presence of non-trivial triggering and/or late data, I'm not
 so
 sure this is "easy." E.g. A bundle may fail, and more data may come
 in
 from upstream (and get appended to the buffer) before it is
>>> retried.
 
>>> 
>>> That will still work. If the subsequent ParDo has processed the
 Iterable,
>>> that means we'll have successfully checkpointed a marker to the list
>> (using
>>> whatever technique the runner supports). More data coming in will
>>> get
>>> appended after the marker, so we can ensure that the retry still
>>> sees
 the
>>> same elements in the Iterable.
>> 
>> I'm thinking of the following.
>> 
>> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored in
>> the state. A trigger gets set.
>> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
>> for some reason fails.
>> 3. (k, v3) comes into the GABW and [v3] gets appended to the state.
>> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is sent
>> downstream.
>> 
>> 
> If you add the annotation specifying stableinput, then we will not do
 this.
> Before we send anything downstream, we will add a marker to the list,
>>> and
> only forward data downstream once the marker has been checkpointed.
>>> This
> adds a bit of cost and latency of course, but the assumption is that
 adding
> this annotation will always add some cost.
 
 I don't think you can checkpoint anything "before sending data
 downstream" if its being executed as part of a fused graph, unless we
 add special support for this in the Fn API. I suppose the runner could
 pre-emptively modify the state of any GABW operations before firing
 triggers...
 
>> It is unclear when a marker would be added to the list. Is this in
>> step 2 which, despite failing, still result in modified state [v1, v2,
>> marker]? (And this state modification would have to be committed
>> before attempting the bundle, in case the "failure" was something like
>> a VM shutdown.) And only on success the state is modified to be 

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
2017-11-15 15:23 GMT+01:00 Jean-Baptiste Onofré :
> Hi Romain,
>
> 1. you always have the GlobalWindow at least. It's more related to trigger.

Yep but having an implicit window and being able to rely on window
features are 2 things, think some runners don't support it yet so can
be something not reliable enough - this was my main concern. If not
true then it is ok.

> 2. How would you define this ? With annotation (on what in that case) or
> using checkpoint method ?

Both work, annotation on a new type of method in the dofn or an
external checkpoint algorithm. Tend to think the first is easier.

> 3. Agree to have a core PTransform for that.
>
> Regards
> JB
>
>
> On 11/15/2017 02:16 PM, Romain Manni-Bucau wrote:
>>
>> @Reuven: it looks like a good workaround
>> @Ken: thks a lot for the link!
>>
>> @all:
>>
>> 1. do you think it is doable without windowing usage (to have
>> something more reliable in term of runner since it will depend on less
>> primitives?
>> 2. what about allowing the user to define when to checkpoint?
>> 3. can we get this kind of "composite" pattern in the beam core?
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles :
>>>
>>> In case the connection is not clear to folks on this thread, I pinged the
>>> thread on @StableReplay / @RequiresStableInput / etc and opened a draft
>>> PR
>>> at https://github.com/apache/beam/pull/4135.
>>>
>>> On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax 
>>> wrote:
>>>
 so I think the following will do exactly that and can be easily factored
 into a reusable transform (modulo Java type boilerplate):

 pCollection.apply(WithKeys.of((Element e) ->
 ThreadLocalRandom.current().nextInt(N))
.apply(Window.into(new GlobalWindows())

 .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.
 elementCountAtLeast(100
.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn<>() {
@ProcessElement
@StableReplay
 public void processElement(ProcessContext c) {
   // Insert c.element().getValue() into backend.
 }
 });

 On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau
 
>
 wrote:

> 2017-11-15 11:42 GMT+01:00 Reuven Lax :
>>
>> Can we describe this at a higher level?
>>
>> I think what you want is the following. Please correct if I'm
>> misunderstanding.
>>
>> Batches of 100 elements (is this a hard requirement, or do they have
>> to
>
> be
>>
>> "approximately" 100 element?)
>
>
> Approximately is fine while documented (what is not is 100 instead
> of 10 for instance)
>
>>
>> Once you see a batch, you're guaranteed to see the same batch on

 retries.
>
>
> +1
>
>>
>> You want to then idempotently insert this batch into some backend.

 Things
>>
>> may fail, workers may crash, but in that case you want to get the
>> exact
>> same batch back so you can insert it again.
>
>
> +1
>
>>
>> Do you care about ordering? On failure do you have to see the same
>
> batches
>>
>> in the same order as before, or is it sufficient to see the same

 batches?
>
>
> Beam doesnt everywhere so I guess it is not important - at least for
> my cases this statement is true.
>
>>
>> Reuven
>>
>> On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
>
> rmannibu...@gmail.com>
>>
>> wrote:
>>
>>> Overall goal is to ensure each 100 elements max, a "backend" (as
>>> datastore) flush/commit/push is done and is aligned with beam
>>> checkpoints. You can see it as bringing the "general" commit-interval
>>> notion to beam and kind of get rid of the bundle notion which is
>>> almost impossible to use today.
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>
>>>
>>> 2017-11-15 10:27 GMT+01:00 Reuven Lax :

 It's in the dev list archives, not sure if there's a doc yet.

 I'm not quite sure I understand what you mean by a "flush" Can you
>>>
>>> describe

 the problem you're trying to solve?

 Reuven

 On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
>>>
>>> rmannibu...@gmail.com>

 wrote:

> Hmm, I didn't find the doc - if you have the link not far it would

 be
>
> appreciated - but "before" 

Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré

Hi Romain,

1. you always have the GlobalWindow at least. It's more related to trigger.
2. How would you define this ? With annotation (on what in that case) or using 
checkpoint method ?

3. Agree to have a core PTransform for that.

Regards
JB

On 11/15/2017 02:16 PM, Romain Manni-Bucau wrote:

@Reuven: it looks like a good workaround
@Ken: thks a lot for the link!

@all:

1. do you think it is doable without windowing usage (to have
something more reliable in term of runner since it will depend on less
primitives?
2. what about allowing the user to define when to checkpoint?
3. can we get this kind of "composite" pattern in the beam core?



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 14:12 GMT+01:00 Kenneth Knowles :

In case the connection is not clear to folks on this thread, I pinged the
thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR
at https://github.com/apache/beam/pull/4135.

On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax 
wrote:


so I think the following will do exactly that and can be easily factored
into a reusable transform (modulo Java type boilerplate):

pCollection.apply(WithKeys.of((Element e) ->
ThreadLocalRandom.current().nextInt(N))
   .apply(Window.into(new GlobalWindows())

.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.
elementCountAtLeast(100
   .apply(GroupByKey.create())
   .apply(ParDo.of(new DoFn<>() {
   @ProcessElement
   @StableReplay
public void processElement(ProcessContext c) {
  // Insert c.element().getValue() into backend.
}
});

On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau :

Can we describe this at a higher level?

I think what you want is the following. Please correct if I'm
misunderstanding.

Batches of 100 elements (is this a hard requirement, or do they have to

be

"approximately" 100 element?)


Approximately is fine while documented (what is not is 100 instead
of 10 for instance)



Once you see a batch, you're guaranteed to see the same batch on

retries.


+1



You want to then idempotently insert this batch into some backend.

Things

may fail, workers may crash, but in that case you want to get the exact
same batch back so you can insert it again.


+1



Do you care about ordering? On failure do you have to see the same

batches

in the same order as before, or is it sufficient to see the same

batches?


Beam doesnt everywhere so I guess it is not important - at least for
my cases this statement is true.



Reuven

On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


Overall goal is to ensure each 100 elements max, a "backend" (as
datastore) flush/commit/push is done and is aligned with beam
checkpoints. You can see it as bringing the "general" commit-interval
notion to beam and kind of get rid of the bundle notion which is
almost impossible to use today.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:27 GMT+01:00 Reuven Lax :

It's in the dev list archives, not sure if there's a doc yet.

I'm not quite sure I understand what you mean by a "flush" Can you

describe

the problem you're trying to solve?

Reuven

On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


Hmm, I didn't find the doc - if you have the link not far it would

be

appreciated - but "before" sounds not enough, it should be "after"

in

case there was a "flush" no?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:10 GMT+01:00 Reuven Lax :

If you set @StableReplay before a ParDo, it forces a checkpoint

before

that

ParDo.

On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


It sounds a good start. I'm not sure how a group by key (and not

by

size) can help controlling the checkpointing interval. Wonder if

we

shouldn't be able to have a CheckpointPolicy { boolean
shouldCheckpoint() } used in the processing event loop. Default

could

be up to the runner but if set on the transform (or dofn) it

would be

used to control when the checkpoint is done. Thinking out loud

it

sounds close to jbatch checkpoint algorithm
(https://docs.oracle.com/javaee/7/api/javax/batch/api/
chunk/CheckpointAlgorithm.html)

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré 

Re: [VOTE] Release 2.2.0, release candidate #3

2017-11-15 Thread Reuven Lax
I'm trying to do the last CP and cut RC4, but I'm getting a compilation
failure in Python - "ImportError: No module named site"

Did we possibly break the release branch on one of the Python CPs?

Reuven

On Sun, Nov 12, 2017 at 5:12 PM, Jean-Baptiste Onofré 
wrote:

> Hi Reuven,
>
> +1 for RC4, and don't worry: it's part of the process. I prefer to have a
> long release process than a crappy a release ;) That's exactly the purpose
> of review & vote.
>
> I definitely think that having releases more often will reduce such kind
> of issue.
>
> Regards
> JB
>
>
> On 11/12/2017 09:04 AM, Reuven Lax wrote:
>
>> I definitely appreciate the frustration about how long this release is
>> taking. It's verging on the point of ridiculous at this point, and we need
>> to fix some of the things that caused us to get to this state (for one
>> thing our infrastructure was so busted at one point, that Valentyn spent 2
>> weeks trying to get on PR merged into the release branch).
>>
>> At this point, let's try and fix this Monday. Unfortunately this is not
>> the
>> sole issue requiring RC4. Python verification failed as well, and we need
>> an RC4 regardless to merge those PRs. I'm hoping that RC4 is our final RC,
>> and we can finish voting next week.
>>
>> Reuven
>>
>> On Sat, Nov 11, 2017 at 6:24 AM, Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> wrote:
>>
>> Le 11 nov. 2017 09:52, "Jean-Baptiste Onofré"  a écrit :
>>>
>>> If the purpose is to release 2.2.1 in one week, why not just to a RC4 ?
>>>
>>> It's not a regression because WriteFiles is new and extend the previous
>>> FileSource. So it could consider as a severe bug, especially on
>>> WriteFiles
>>> which is important.
>>>
>>>
>>> Fair enough.
>>>
>>>
>>> The core issue is the time we spent already on this release: roughly 1
>>> month !!! It's clearly too long due to different causes.
>>> When I did the previous releases, it took 3 or 4 days. It's clearly the
>>> target as, as said, I would like to have a release pace of a release
>>> every
>>> 6 weeks.
>>>
>>>
>>>
>>> Agree and this is why 2.2.0 must be out now IMHO. If you are confident
>>> next
>>> week is sufficient just go ahead and ignore my comment but my point was
>>> the
>>> same: it shouldnt last so long if there is no regression :(.
>>>
>>>
>>>
>>> Regards
>>> JB
>>>
>>>
>>> On 11/11/2017 08:41 AM, Romain Manni-Bucau wrote:
>>>
>>> You can see it differently: is there a critical bug? Yes! Is there a
 regression? No!

 So no need to wait another week (keep in mind 2 days + 3 days of vote
 makes easily 1 working week). This vote could be closed already and next
 week 2.2.1 could fix this bug, no? Overall idea is to not hold the
 community more than needed if there is no regression compared to last
 few
 releases.

 Le 11 nov. 2017 07:46, "Jean-Baptiste Onofré"  a écrit

>>> :
>>>

 -1 (binding)

>
> I agree with Eugene, data loss is severe.
>
> As Eugene seems confident to fix that quickly, I think it's worth to
>
 cut a
>>>
 RC4.
>
> However, I would introduce a deadline. As I would like to propose a
> release cycle of a release every 6 weeks (whatever it contains, but it
> really important to keep  a regular pace in releases), a release should
>
 be
>>>
 cut in couple of days. So, maybe we can give us 2 business days to fix
> that
> and propose a RC4. Basically, if this issue is not fix on Tuesday
> night,
> then, we move forward anyway.
>
> Regards
> JB
>
> On 11/10/2017 07:42 PM, Eugene Kirpichov wrote:
>
> Unfortunately I think I found a data loss bug - it was there since
> 2.0.0
>
>> but I think it's serious enough that delaying a fix until the next
>> release
>> would be irresponsible.
>> See https://issues.apache.org/jira/browse/BEAM-3169
>>
>> On Thu, Nov 9, 2017 at 3:57 PM Robert Bradshaw
>> 
>> wrote:
>>
>> Our release notes look like nothing more than a query for the closed
>>
>> jira issues. Do we have a top-level summary to highlight the big
>>> ticket items in the release? And in particular somewhere to mention
>>> that this is likely the last release to support Java 7 that'll get
>>> widely read?
>>>
>>> On Thu, Nov 9, 2017 at 3:39 PM, Reuven Lax >> >
>>> wrote:
>>>
>>> Thanks,
>>>

 This RC is currently failing on a number of validation steps, so we
 need

 to

>>>
>>> cut at least one more RC. Fingers crossed that it will be the last
>>>
>> one.
>>>

 Reuven

 On Thu, Nov 9, 2017 at 3:36 PM, Konstantinos Katsiapis <
 katsia...@google.com.invalid> wrote:

 Just a remark: Release of Tensorflow Transform

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
@Reuven: it looks like a good workaround
@Ken: thks a lot for the link!

@all:

1. do you think it is doable without windowing usage (to have
something more reliable in term of runner since it will depend on less
primitives?
2. what about allowing the user to define when to checkpoint?
3. can we get this kind of "composite" pattern in the beam core?



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 14:12 GMT+01:00 Kenneth Knowles :
> In case the connection is not clear to folks on this thread, I pinged the
> thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR
> at https://github.com/apache/beam/pull/4135.
>
> On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax 
> wrote:
>
>> so I think the following will do exactly that and can be easily factored
>> into a reusable transform (modulo Java type boilerplate):
>>
>> pCollection.apply(WithKeys.of((Element e) ->
>> ThreadLocalRandom.current().nextInt(N))
>>   .apply(Window.into(new GlobalWindows())
>>
>> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.
>> elementCountAtLeast(100
>>   .apply(GroupByKey.create())
>>   .apply(ParDo.of(new DoFn<>() {
>>   @ProcessElement
>>   @StableReplay
>>public void processElement(ProcessContext c) {
>>  // Insert c.element().getValue() into backend.
>>}
>>});
>>
>> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau > >
>> wrote:
>>
>> > 2017-11-15 11:42 GMT+01:00 Reuven Lax :
>> > > Can we describe this at a higher level?
>> > >
>> > > I think what you want is the following. Please correct if I'm
>> > > misunderstanding.
>> > >
>> > > Batches of 100 elements (is this a hard requirement, or do they have to
>> > be
>> > > "approximately" 100 element?)
>> >
>> > Approximately is fine while documented (what is not is 100 instead
>> > of 10 for instance)
>> >
>> > >
>> > > Once you see a batch, you're guaranteed to see the same batch on
>> retries.
>> >
>> > +1
>> >
>> > >
>> > > You want to then idempotently insert this batch into some backend.
>> Things
>> > > may fail, workers may crash, but in that case you want to get the exact
>> > > same batch back so you can insert it again.
>> >
>> > +1
>> >
>> > >
>> > > Do you care about ordering? On failure do you have to see the same
>> > batches
>> > > in the same order as before, or is it sufficient to see the same
>> batches?
>> >
>> > Beam doesnt everywhere so I guess it is not important - at least for
>> > my cases this statement is true.
>> >
>> > >
>> > > Reuven
>> > >
>> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
>> > rmannibu...@gmail.com>
>> > > wrote:
>> > >
>> > >> Overall goal is to ensure each 100 elements max, a "backend" (as
>> > >> datastore) flush/commit/push is done and is aligned with beam
>> > >> checkpoints. You can see it as bringing the "general" commit-interval
>> > >> notion to beam and kind of get rid of the bundle notion which is
>> > >> almost impossible to use today.
>> > >>
>> > >> Romain Manni-Bucau
>> > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> > >>
>> > >>
>> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax :
>> > >> > It's in the dev list archives, not sure if there's a doc yet.
>> > >> >
>> > >> > I'm not quite sure I understand what you mean by a "flush" Can you
>> > >> describe
>> > >> > the problem you're trying to solve?
>> > >> >
>> > >> > Reuven
>> > >> >
>> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
>> > >> rmannibu...@gmail.com>
>> > >> > wrote:
>> > >> >
>> > >> >> Hmm, I didn't find the doc - if you have the link not far it would
>> be
>> > >> >> appreciated - but "before" sounds not enough, it should be "after"
>> in
>> > >> >> case there was a "flush" no?
>> > >> >>
>> > >> >> Romain Manni-Bucau
>> > >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> > >> >>
>> > >> >>
>> > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax :
>> > >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint
>> > before
>> > >> >> that
>> > >> >> > ParDo.
>> > >> >> >
>> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
>> > >> >> rmannibu...@gmail.com>
>> > >> >> > wrote:
>> > >> >> >
>> > >> >> >> It sounds a good start. I'm not sure how a group by key (and not
>> > by
>> > >> >> >> size) can help controlling the checkpointing interval. Wonder if
>> > we
>> > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean
>> > >> >> >> shouldCheckpoint() } used in the processing event loop. Default
>> > could
>> > >> >> >> be up to the runner but if set on the transform (or dofn) it
>> > would be
>> > >> >> >> used to control when the checkpoint is done. Thinking out loud
>> 

Re: makes bundle concept usable?

2017-11-15 Thread Kenneth Knowles
In case the connection is not clear to folks on this thread, I pinged the
thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR
at https://github.com/apache/beam/pull/4135.

On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax 
wrote:

> so I think the following will do exactly that and can be easily factored
> into a reusable transform (modulo Java type boilerplate):
>
> pCollection.apply(WithKeys.of((Element e) ->
> ThreadLocalRandom.current().nextInt(N))
>   .apply(Window.into(new GlobalWindows())
>
> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.
> elementCountAtLeast(100
>   .apply(GroupByKey.create())
>   .apply(ParDo.of(new DoFn<>() {
>   @ProcessElement
>   @StableReplay
>public void processElement(ProcessContext c) {
>  // Insert c.element().getValue() into backend.
>}
>});
>
> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau  >
> wrote:
>
> > 2017-11-15 11:42 GMT+01:00 Reuven Lax :
> > > Can we describe this at a higher level?
> > >
> > > I think what you want is the following. Please correct if I'm
> > > misunderstanding.
> > >
> > > Batches of 100 elements (is this a hard requirement, or do they have to
> > be
> > > "approximately" 100 element?)
> >
> > Approximately is fine while documented (what is not is 100 instead
> > of 10 for instance)
> >
> > >
> > > Once you see a batch, you're guaranteed to see the same batch on
> retries.
> >
> > +1
> >
> > >
> > > You want to then idempotently insert this batch into some backend.
> Things
> > > may fail, workers may crash, but in that case you want to get the exact
> > > same batch back so you can insert it again.
> >
> > +1
> >
> > >
> > > Do you care about ordering? On failure do you have to see the same
> > batches
> > > in the same order as before, or is it sufficient to see the same
> batches?
> >
> > Beam doesnt everywhere so I guess it is not important - at least for
> > my cases this statement is true.
> >
> > >
> > > Reuven
> > >
> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
> > rmannibu...@gmail.com>
> > > wrote:
> > >
> > >> Overall goal is to ensure each 100 elements max, a "backend" (as
> > >> datastore) flush/commit/push is done and is aligned with beam
> > >> checkpoints. You can see it as bringing the "general" commit-interval
> > >> notion to beam and kind of get rid of the bundle notion which is
> > >> almost impossible to use today.
> > >>
> > >> Romain Manni-Bucau
> > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >>
> > >>
> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax :
> > >> > It's in the dev list archives, not sure if there's a doc yet.
> > >> >
> > >> > I'm not quite sure I understand what you mean by a "flush" Can you
> > >> describe
> > >> > the problem you're trying to solve?
> > >> >
> > >> > Reuven
> > >> >
> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> > >> rmannibu...@gmail.com>
> > >> > wrote:
> > >> >
> > >> >> Hmm, I didn't find the doc - if you have the link not far it would
> be
> > >> >> appreciated - but "before" sounds not enough, it should be "after"
> in
> > >> >> case there was a "flush" no?
> > >> >>
> > >> >> Romain Manni-Bucau
> > >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >>
> > >> >>
> > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax :
> > >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint
> > before
> > >> >> that
> > >> >> > ParDo.
> > >> >> >
> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> > >> >> rmannibu...@gmail.com>
> > >> >> > wrote:
> > >> >> >
> > >> >> >> It sounds a good start. I'm not sure how a group by key (and not
> > by
> > >> >> >> size) can help controlling the checkpointing interval. Wonder if
> > we
> > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean
> > >> >> >> shouldCheckpoint() } used in the processing event loop. Default
> > could
> > >> >> >> be up to the runner but if set on the transform (or dofn) it
> > would be
> > >> >> >> used to control when the checkpoint is done. Thinking out loud
> it
> > >> >> >> sounds close to jbatch checkpoint algorithm
> > >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> > >> >> >> chunk/CheckpointAlgorithm.html)
> > >> >> >>
> > >> >> >> Romain Manni-Bucau
> > >> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >> >>
> > >> >> >>
> > >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré  >:
> > >> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
> > >> >> >> >
> > >> >> >> >
> > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> > >> >> >> >>
> > >> >> >> >> Romain,
> > >> >> >> >>
> > >> >> >> >> I think 

Re: [PROPOSAL] "Requires deterministic input"

2017-11-15 Thread Jean-Baptiste Onofré

Agree !

Thanks Kenn,
Regards
JB

On 11/15/2017 02:05 PM, Kenneth Knowles wrote:

Reviving this again, since it came up again today in yet another context. I
think it is time to add this as an experimental annotation. I think we know
that we need it, and roughly how it should work, while there are still
finer points to discuss about what it means for input to be stable.

So I filed https://issues.apache.org/jira/browse/BEAM-3194 and whipped up
https://github.com/apache/beam/pull/4135 to move it along a smidge. It will
need to be incorporated into the Beam model's ParDoPayload and the Python
SDK as well.

Kenn

On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax 
wrote:


Well the Fn API is still being designed, so this is something we'd have to
think about.

On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:


On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax 
wrote:

On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:


On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax 

Re: [PROPOSAL] "Requires deterministic input"

2017-11-15 Thread Kenneth Knowles
Reviving this again, since it came up again today in yet another context. I
think it is time to add this as an experimental annotation. I think we know
that we need it, and roughly how it should work, while there are still
finer points to discuss about what it means for input to be stable.

So I filed https://issues.apache.org/jira/browse/BEAM-3194 and whipped up
https://github.com/apache/beam/pull/4135 to move it along a smidge. It will
need to be incorporated into the Beam model's ParDoPayload and the Python
SDK as well.

Kenn

On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax 
wrote:

> Well the Fn API is still being designed, so this is something we'd have to
> think about.
>
> On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
> > On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax 
> > wrote:
> > > On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
> > > rober...@google.com.invalid> wrote:
> > >
> > >> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax  >
> > >> wrote:
> > >> > On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
> > >> >> The question here is whether the ordering is part of the "content"
> of
> > >> >> an iterable.
> > >> >
> > >> > My initial instinct was to say yes - but maybe it should not be
> until
> > >> Beam
> > >> > has a first-class notion of sorted values after a GBK?
> > >>
> > >> Yeah, I'm not sure on this either. Interestingly, if we consider
> > >> ordering to be important, than the composite gbk + ungroup will be
> > >> stable despite its components not being so.
> > >>
> > >> >> >> As I mention above, the iterable is semantically [part of] a
> > single
> > >> >> >> element. So just to unpack this, to make sure we are talking
> about
> > >> the
> > >> >> same
> > >> >> >> thing, I think you are talking about GBK as implemented via
> GBKO +
> > >> GABW.
> > >> >> >>
> > >> >> >> When the output of GABW is required to be stable but the output
> of
> > >> GBKO
> > >> >> is
> > >> >> >> not stable, we don't have stability for free in all cases by
> > >> inserting a
> > >> >> >> GBK, but require something more to make the output of GABW
> > stable, in
> > >> >> the
> > >> >> >> worst case a full materialization.
> > >> >> >>
> > >> >> >
> > >> >> > Correct. My point is that there are alternate, cheaper ways of
> > doing
> > >> >> this.
> > >> >> > If GABW stores state in an ordered list, it can simply
> checkpoint a
> > >> >> market
> > >> >> > into that list to ensure that the output is stabl.
> > >> >>
> > >> >> In the presence of non-trivial triggering and/or late data, I'm not
> > so
> > >> >> sure this is "easy." E.g. A bundle may fail, and more data may come
> > in
> > >> >> from upstream (and get appended to the buffer) before it is
> retried.
> > >> >>
> > >> >
> > >> > That will still work. If the subsequent ParDo has processed the
> > Iterable,
> > >> > that means we'll have successfully checkpointed a marker to the list
> > >> (using
> > >> > whatever technique the runner supports). More data coming in will
> get
> > >> > appended after the marker, so we can ensure that the retry still
> sees
> > the
> > >> > same elements in the Iterable.
> > >>
> > >> I'm thinking of the following.
> > >>
> > >> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored in
> > >> the state. A trigger gets set.
> > >> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
> > >> for some reason fails.
> > >> 3. (k, v3) comes into the GABW and [v3] gets appended to the state.
> > >> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is sent
> > >> downstream.
> > >>
> > >>
> > > If you add the annotation specifying stableinput, then we will not do
> > this.
> > > Before we send anything downstream, we will add a marker to the list,
> and
> > > only forward data downstream once the marker has been checkpointed.
> This
> > > adds a bit of cost and latency of course, but the assumption is that
> > adding
> > > this annotation will always add some cost.
> >
> > I don't think you can checkpoint anything "before sending data
> > downstream" if its being executed as part of a fused graph, unless we
> > add special support for this in the Fn API. I suppose the runner could
> > pre-emptively modify the state of any GABW operations before firing
> > triggers...
> >
> > >> It is unclear when a marker would be added to the list. Is this in
> > >> step 2 which, despite failing, still result in modified state [v1, v2,
> > >> marker]? (And this state modification would have to be committed
> > >> before attempting the bundle, in case the "failure" was something like
> > >> a VM shutdown.) And only on success the state is modified to be (say
> > >> this is accumulating mode) [v1, v2]?
> > >>
> > >> I think it could be done, but it may significantly complicate things.
> > >>
> >
>


Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
so I think the following will do exactly that and can be easily factored
into a reusable transform (modulo Java type boilerplate):

pCollection.apply(WithKeys.of((Element e) ->
ThreadLocalRandom.current().nextInt(N))
  .apply(Window.into(new GlobalWindows())

.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(100
  .apply(GroupByKey.create())
  .apply(ParDo.of(new DoFn<>() {
  @ProcessElement
  @StableReplay
   public void processElement(ProcessContext c) {
 // Insert c.element().getValue() into backend.
   }
   });

On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau 
wrote:

> 2017-11-15 11:42 GMT+01:00 Reuven Lax :
> > Can we describe this at a higher level?
> >
> > I think what you want is the following. Please correct if I'm
> > misunderstanding.
> >
> > Batches of 100 elements (is this a hard requirement, or do they have to
> be
> > "approximately" 100 element?)
>
> Approximately is fine while documented (what is not is 100 instead
> of 10 for instance)
>
> >
> > Once you see a batch, you're guaranteed to see the same batch on retries.
>
> +1
>
> >
> > You want to then idempotently insert this batch into some backend. Things
> > may fail, workers may crash, but in that case you want to get the exact
> > same batch back so you can insert it again.
>
> +1
>
> >
> > Do you care about ordering? On failure do you have to see the same
> batches
> > in the same order as before, or is it sufficient to see the same batches?
>
> Beam doesnt everywhere so I guess it is not important - at least for
> my cases this statement is true.
>
> >
> > Reuven
> >
> > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >
> >> Overall goal is to ensure each 100 elements max, a "backend" (as
> >> datastore) flush/commit/push is done and is aligned with beam
> >> checkpoints. You can see it as bringing the "general" commit-interval
> >> notion to beam and kind of get rid of the bundle notion which is
> >> almost impossible to use today.
> >>
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
> >>
> >> 2017-11-15 10:27 GMT+01:00 Reuven Lax :
> >> > It's in the dev list archives, not sure if there's a doc yet.
> >> >
> >> > I'm not quite sure I understand what you mean by a "flush" Can you
> >> describe
> >> > the problem you're trying to solve?
> >> >
> >> > Reuven
> >> >
> >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> >> rmannibu...@gmail.com>
> >> > wrote:
> >> >
> >> >> Hmm, I didn't find the doc - if you have the link not far it would be
> >> >> appreciated - but "before" sounds not enough, it should be "after" in
> >> >> case there was a "flush" no?
> >> >>
> >> >> Romain Manni-Bucau
> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >>
> >> >>
> >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax :
> >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint
> before
> >> >> that
> >> >> > ParDo.
> >> >> >
> >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> >> >> rmannibu...@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> It sounds a good start. I'm not sure how a group by key (and not
> by
> >> >> >> size) can help controlling the checkpointing interval. Wonder if
> we
> >> >> >> shouldn't be able to have a CheckpointPolicy { boolean
> >> >> >> shouldCheckpoint() } used in the processing event loop. Default
> could
> >> >> >> be up to the runner but if set on the transform (or dofn) it
> would be
> >> >> >> used to control when the checkpoint is done. Thinking out loud it
> >> >> >> sounds close to jbatch checkpoint algorithm
> >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> >> >> >> chunk/CheckpointAlgorithm.html)
> >> >> >>
> >> >> >> Romain Manni-Bucau
> >> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>
> >> >> >>
> >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
> >> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
> >> >> >> >
> >> >> >> >
> >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >> >> >> >>
> >> >> >> >> Romain,
> >> >> >> >>
> >> >> >> >> I think the @StableReplay semantic that Kenn proposed a month
> or
> >> so
> >> >> ago
> >> >> >> is
> >> >> >> >> what is needed here.
> >> >> >> >>
> >> >> >> >> Essentially it will ensure that the GroupByKey iterable is
> stable
> >> and
> >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to
> >> receive
> >> >> the
> >> >> >> >> exact same iterable as it did before. The annotation can be put
> >> on a
> >> >> >> ParDo
> >> >> >> >> as well, in which case it ensures stability (and
> checkpointing) of
> >> >> the
> >> >> >> >> 

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
2017-11-15 11:42 GMT+01:00 Reuven Lax :
> Can we describe this at a higher level?
>
> I think what you want is the following. Please correct if I'm
> misunderstanding.
>
> Batches of 100 elements (is this a hard requirement, or do they have to be
> "approximately" 100 element?)

Approximately is fine while documented (what is not is 100 instead
of 10 for instance)

>
> Once you see a batch, you're guaranteed to see the same batch on retries.

+1

>
> You want to then idempotently insert this batch into some backend. Things
> may fail, workers may crash, but in that case you want to get the exact
> same batch back so you can insert it again.

+1

>
> Do you care about ordering? On failure do you have to see the same batches
> in the same order as before, or is it sufficient to see the same batches?

Beam doesnt everywhere so I guess it is not important - at least for
my cases this statement is true.

>
> Reuven
>
> On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau 
> wrote:
>
>> Overall goal is to ensure each 100 elements max, a "backend" (as
>> datastore) flush/commit/push is done and is aligned with beam
>> checkpoints. You can see it as bringing the "general" commit-interval
>> notion to beam and kind of get rid of the bundle notion which is
>> almost impossible to use today.
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 10:27 GMT+01:00 Reuven Lax :
>> > It's in the dev list archives, not sure if there's a doc yet.
>> >
>> > I'm not quite sure I understand what you mean by a "flush" Can you
>> describe
>> > the problem you're trying to solve?
>> >
>> > Reuven
>> >
>> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> > wrote:
>> >
>> >> Hmm, I didn't find the doc - if you have the link not far it would be
>> >> appreciated - but "before" sounds not enough, it should be "after" in
>> >> case there was a "flush" no?
>> >>
>> >> Romain Manni-Bucau
>> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>
>> >>
>> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax :
>> >> > If you set @StableReplay before a ParDo, it forces a checkpoint before
>> >> that
>> >> > ParDo.
>> >> >
>> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
>> >> rmannibu...@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> It sounds a good start. I'm not sure how a group by key (and not by
>> >> >> size) can help controlling the checkpointing interval. Wonder if we
>> >> >> shouldn't be able to have a CheckpointPolicy { boolean
>> >> >> shouldCheckpoint() } used in the processing event loop. Default could
>> >> >> be up to the runner but if set on the transform (or dofn) it would be
>> >> >> used to control when the checkpoint is done. Thinking out loud it
>> >> >> sounds close to jbatch checkpoint algorithm
>> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>> >> >> chunk/CheckpointAlgorithm.html)
>> >> >>
>> >> >> Romain Manni-Bucau
>> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>
>> >> >>
>> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
>> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
>> >> >> >
>> >> >> >
>> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
>> >> >> >>
>> >> >> >> Romain,
>> >> >> >>
>> >> >> >> I think the @StableReplay semantic that Kenn proposed a month or
>> so
>> >> ago
>> >> >> is
>> >> >> >> what is needed here.
>> >> >> >>
>> >> >> >> Essentially it will ensure that the GroupByKey iterable is stable
>> and
>> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to
>> receive
>> >> the
>> >> >> >> exact same iterable as it did before. The annotation can be put
>> on a
>> >> >> ParDo
>> >> >> >> as well, in which case it ensures stability (and checkpointing) of
>> >> the
>> >> >> >> individual ParDo elements.
>> >> >> >>
>> >> >> >> Reuven
>> >> >> >>
>> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> >> >> >> 
>> >> >> >> wrote:
>> >> >> >>
>> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré > >:
>> >> >> 
>> >> >>  Hi Romain,
>> >> >> 
>> >> >>  You are right: currently, the chunking is related to bundles.
>> >> Today,
>> >> >> the
>> >> >>  bundle size is under the runner responsibility.
>> >> >> 
>> >> >>  I think it's fine because only the runner know an efficient
>> bundle
>> >> >> size.
>> >> >> >>>
>> >> >> >>> I'm
>> >> >> 
>> >> >>  afraid giving the "control" of the bundle size to the end user
>> (via
>> >> >>  pipeline) can result to huge performances issue depending of the
>> >> >> runner.
>> >> >> 
>> >> >>  It doesn't mean that we can't use an uber layer: it's what we
>> do in
>> >> >>  ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>> >> >> 
>> >> >>  Anyway, the core problem 

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Can we describe this at a higher level?

I think what you want is the following. Please correct if I'm
misunderstanding.

Batches of 100 elements (is this a hard requirement, or do they have to be
"approximately" 100 element?)

Once you see a batch, you're guaranteed to see the same batch on retries.

You want to then idempotently insert this batch into some backend. Things
may fail, workers may crash, but in that case you want to get the exact
same batch back so you can insert it again.

Do you care about ordering? On failure do you have to see the same batches
in the same order as before, or is it sufficient to see the same batches?

Reuven

On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau 
wrote:

> Overall goal is to ensure each 100 elements max, a "backend" (as
> datastore) flush/commit/push is done and is aligned with beam
> checkpoints. You can see it as bringing the "general" commit-interval
> notion to beam and kind of get rid of the bundle notion which is
> almost impossible to use today.
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 10:27 GMT+01:00 Reuven Lax :
> > It's in the dev list archives, not sure if there's a doc yet.
> >
> > I'm not quite sure I understand what you mean by a "flush" Can you
> describe
> > the problem you're trying to solve?
> >
> > Reuven
> >
> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >
> >> Hmm, I didn't find the doc - if you have the link not far it would be
> >> appreciated - but "before" sounds not enough, it should be "after" in
> >> case there was a "flush" no?
> >>
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
> >>
> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax :
> >> > If you set @StableReplay before a ParDo, it forces a checkpoint before
> >> that
> >> > ParDo.
> >> >
> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> >> rmannibu...@gmail.com>
> >> > wrote:
> >> >
> >> >> It sounds a good start. I'm not sure how a group by key (and not by
> >> >> size) can help controlling the checkpointing interval. Wonder if we
> >> >> shouldn't be able to have a CheckpointPolicy { boolean
> >> >> shouldCheckpoint() } used in the processing event loop. Default could
> >> >> be up to the runner but if set on the transform (or dofn) it would be
> >> >> used to control when the checkpoint is done. Thinking out loud it
> >> >> sounds close to jbatch checkpoint algorithm
> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> >> >> chunk/CheckpointAlgorithm.html)
> >> >>
> >> >> Romain Manni-Bucau
> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >>
> >> >>
> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
> >> >> >
> >> >> >
> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >> >> >>
> >> >> >> Romain,
> >> >> >>
> >> >> >> I think the @StableReplay semantic that Kenn proposed a month or
> so
> >> ago
> >> >> is
> >> >> >> what is needed here.
> >> >> >>
> >> >> >> Essentially it will ensure that the GroupByKey iterable is stable
> and
> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to
> receive
> >> the
> >> >> >> exact same iterable as it did before. The annotation can be put
> on a
> >> >> ParDo
> >> >> >> as well, in which case it ensures stability (and checkpointing) of
> >> the
> >> >> >> individual ParDo elements.
> >> >> >>
> >> >> >> Reuven
> >> >> >>
> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> >> >> >> 
> >> >> >> wrote:
> >> >> >>
> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré  >:
> >> >> 
> >> >>  Hi Romain,
> >> >> 
> >> >>  You are right: currently, the chunking is related to bundles.
> >> Today,
> >> >> the
> >> >>  bundle size is under the runner responsibility.
> >> >> 
> >> >>  I think it's fine because only the runner know an efficient
> bundle
> >> >> size.
> >> >> >>>
> >> >> >>> I'm
> >> >> 
> >> >>  afraid giving the "control" of the bundle size to the end user
> (via
> >> >>  pipeline) can result to huge performances issue depending of the
> >> >> runner.
> >> >> 
> >> >>  It doesn't mean that we can't use an uber layer: it's what we
> do in
> >> >>  ParDoWithBatch or DoFn in IO Sink where we have a batch size.
> >> >> 
> >> >>  Anyway, the core problem is about the checkpoint: why a
> checkpoint
> >> is
> >> >>  not
> >> >>  "respected" by an IO or runner ?
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> Take the example of a runner deciding the bundle size is 4 and
> the
> >> IO
> >> >> >>> deciding the commit-interval (batch semantic) is 2, what happens
> if
> >> >> >>> the 3rd record fails? You have pushed to the store 2 records
> which
> >> can
> >> >> 

Re: Improving integration test coverage of I/O transforms

2017-11-15 Thread Chamikara Jayalath
Sounds good. I'll create JIRAs for both small scale and large scale
versions. To be clear, large scale versions will use PerfKitBenchmarker
based performance testing framework which will allow us to (1) manage
Kubernetes based multi-node deployments of data stores (2) publish/track
benchmark results.

Thanks,
Cham

On Tue, Nov 14, 2017 at 10:09 PM Jean-Baptiste Onofré 
wrote:

> Hi Cham,
>
> Some are in progress on private branch (at least for MongoDbIO ;)).
>
> +1 to create the Jira, we will assign to the right persons.
>
> Regards
> JB
>
> On 11/15/2017 01:09 AM, Chamikara Jayalath wrote:
> > Hi All,
> >
> > I went through the current list of I/O transforms offered by Beam Java
> SDK
> > and it looks like we currently don't have enough integration test
> coverage
> > for some of the I/O transforms. For example, it might be good to improve
> > test coverage of following (not a complete list).
> >
> > MongoDbIO Read/Write
> > HBaseIO Read/Write
> > HadoopInputFormatIO Read
> > HCatalogIO Read/Write
> > SolrO Read/Write
> > File-based I/O (Text, Avro, etc) Read/ReadAll/Write//Write with dynamic
> > destinations
> >
> > I hope to work with folks cc'ed in this email to add some of these tests.
> > We'll create JIRAs for each of the tests and mention original
> > authors/reviewers in PRs. We extremely appreciate any feedback you can
> > provide to improve these tests. When applicable we also hope to develop
> > large scale versions of some of these tests using Beam performance
> testing
> > framework (https://beam.apache.org/documentation/io/testing/).
> >
> > Thanks,
> > Cham
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
can be, here the options I have in mind:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();




Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 11:04 GMT+01:00 Jean-Baptiste Onofré :
> And the control is given to the DoFn developer via annotations, right ?
>
> So, bundle would be "hidden" and be internal to the runner (which makes
> sense I think) and we introduce "control" points for the DoFn developer that
> the runner will deal with.
>
> Correct ?
>
> Regards
> JB
>
>
> On 11/15/2017 10:58 AM, Romain Manni-Bucau wrote:
>>
>> Overall goal is to ensure each 100 elements max, a "backend" (as
>> datastore) flush/commit/push is done and is aligned with beam
>> checkpoints. You can see it as bringing the "general" commit-interval
>> notion to beam and kind of get rid of the bundle notion which is
>> almost impossible to use today.
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 10:27 GMT+01:00 Reuven Lax :
>>>
>>> It's in the dev list archives, not sure if there's a doc yet.
>>>
>>> I'm not quite sure I understand what you mean by a "flush" Can you
>>> describe
>>> the problem you're trying to solve?
>>>
>>> Reuven
>>>
>>> On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau
>>> 
>>> wrote:
>>>
 Hmm, I didn't find the doc - if you have the link not far it would be
 appreciated - but "before" sounds not enough, it should be "after" in
 case there was a "flush" no?

 Romain Manni-Bucau
 @rmannibucau |  Blog | Old Blog | Github | LinkedIn


 2017-11-15 10:10 GMT+01:00 Reuven Lax :
>
> If you set @StableReplay before a ParDo, it forces a checkpoint before

 that
>
> ParDo.
>
> On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <

 rmannibu...@gmail.com>
>
> wrote:
>
>> It sounds a good start. I'm not sure how a group by key (and not by
>> size) can help controlling the checkpointing interval. Wonder if we
>> shouldn't be able to have a CheckpointPolicy { boolean
>> shouldCheckpoint() } used in the processing event loop. Default could
>> be up to the runner but if set on the transform (or dofn) it would be
>> used to control when the checkpoint is done. Thinking out loud it
>> sounds close to jbatch checkpoint algorithm
>> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>> chunk/CheckpointAlgorithm.html)
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
>>>
>>> Yes, @StableReplay, that's the annotation. Thanks.
>>>
>>>
>>> On 11/15/2017 09:52 AM, Reuven Lax wrote:


 Romain,

 I think the @StableReplay semantic that Kenn proposed a month or so

 ago
>>
>> is

 what is needed here.

 Essentially it will ensure that the GroupByKey iterable is stable
 and
 checkpointed. So on replay, the GroupByKey is guaranteed to receive

 the

 exact same iterable as it did before. The annotation can be put on a
>>
>> ParDo

 as well, in which case it ensures stability (and checkpointing) of

 the

 individual ParDo elements.

 Reuven

 On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
 
 wrote:

> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :
>>
>>
>> Hi Romain,
>>
>> You are right: currently, the chunking is related to bundles.

 Today,
>>
>> the
>>
>> bundle size is under the runner responsibility.
>>
>> I think it's fine because only the runner know an efficient bundle
>>
>> size.
>
>
> I'm
>>
>>
>> afraid giving the "control" of the bundle size to the end user
>> (via
>> pipeline) can result to huge performances issue depending of the
>>
>> runner.
>>
>>
>> It doesn't mean that we can't use an uber layer: it's what we do
>> in
>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>>
>> Anyway, the core problem is about the checkpoint: why a checkpoint

 is
>>
>> not
>> "respected" by an IO or runner ?
>
>
>
>
> Take the example of a runner deciding the bundle 

Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré

And the control is given to the DoFn developer via annotations, right ?

So, bundle would be "hidden" and be internal to the runner (which makes sense I 
think) and we introduce "control" points for the DoFn developer that the runner 
will deal with.


Correct ?

Regards
JB

On 11/15/2017 10:58 AM, Romain Manni-Bucau wrote:

Overall goal is to ensure each 100 elements max, a "backend" (as
datastore) flush/commit/push is done and is aligned with beam
checkpoints. You can see it as bringing the "general" commit-interval
notion to beam and kind of get rid of the bundle notion which is
almost impossible to use today.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:27 GMT+01:00 Reuven Lax :

It's in the dev list archives, not sure if there's a doc yet.

I'm not quite sure I understand what you mean by a "flush" Can you describe
the problem you're trying to solve?

Reuven

On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau 
wrote:


Hmm, I didn't find the doc - if you have the link not far it would be
appreciated - but "before" sounds not enough, it should be "after" in
case there was a "flush" no?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:10 GMT+01:00 Reuven Lax :

If you set @StableReplay before a ParDo, it forces a checkpoint before

that

ParDo.

On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


It sounds a good start. I'm not sure how a group by key (and not by
size) can help controlling the checkpointing interval. Wonder if we
shouldn't be able to have a CheckpointPolicy { boolean
shouldCheckpoint() } used in the processing event loop. Default could
be up to the runner but if set on the transform (or dofn) it would be
used to control when the checkpoint is done. Thinking out loud it
sounds close to jbatch checkpoint algorithm
(https://docs.oracle.com/javaee/7/api/javax/batch/api/
chunk/CheckpointAlgorithm.html)

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :

Yes, @StableReplay, that's the annotation. Thanks.


On 11/15/2017 09:52 AM, Reuven Lax wrote:


Romain,

I think the @StableReplay semantic that Kenn proposed a month or so

ago

is

what is needed here.

Essentially it will ensure that the GroupByKey iterable is stable and
checkpointed. So on replay, the GroupByKey is guaranteed to receive

the

exact same iterable as it did before. The annotation can be put on a

ParDo

as well, in which case it ensures stability (and checkpointing) of

the

individual ParDo elements.

Reuven

On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau

wrote:


2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :


Hi Romain,

You are right: currently, the chunking is related to bundles.

Today,

the

bundle size is under the runner responsibility.

I think it's fine because only the runner know an efficient bundle

size.


I'm


afraid giving the "control" of the bundle size to the end user (via
pipeline) can result to huge performances issue depending of the

runner.


It doesn't mean that we can't use an uber layer: it's what we do in
ParDoWithBatch or DoFn in IO Sink where we have a batch size.

Anyway, the core problem is about the checkpoint: why a checkpoint

is

not
"respected" by an IO or runner ?




Take the example of a runner deciding the bundle size is 4 and the

IO

deciding the commit-interval (batch semantic) is 2, what happens if
the 3rd record fails? You have pushed to the store 2 records which

can

be reprocessed by a restart of the bundle and you can get

duplicates.


Rephrased: I think we need as a framework a batch/chunk solution

which

is reliable. I understand bundles is mapped on the runner and not
really controlled but can we get something more reliable for the

user?

Maybe we need a @BeforeBatch or something like that.



Regards
JB


On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:



Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too

big.


The natural API for that in beam is the bundle one but bundles are

not

reliable since they can be very small (flink) - we can say it is

"ok"

even if it has some perf impacts - or too big (spark does full

size /

#workers).

The workaround is what we see in the ES I/O: a maxSize which does

an

eager flush. The issue is that then the checkpoint is not

respected

and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam

point

of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn



--
Jean-Baptiste Onofré
jbono...@apache.org

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Overall goal is to ensure each 100 elements max, a "backend" (as
datastore) flush/commit/push is done and is aligned with beam
checkpoints. You can see it as bringing the "general" commit-interval
notion to beam and kind of get rid of the bundle notion which is
almost impossible to use today.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:27 GMT+01:00 Reuven Lax :
> It's in the dev list archives, not sure if there's a doc yet.
>
> I'm not quite sure I understand what you mean by a "flush" Can you describe
> the problem you're trying to solve?
>
> Reuven
>
> On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau 
> wrote:
>
>> Hmm, I didn't find the doc - if you have the link not far it would be
>> appreciated - but "before" sounds not enough, it should be "after" in
>> case there was a "flush" no?
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 10:10 GMT+01:00 Reuven Lax :
>> > If you set @StableReplay before a ParDo, it forces a checkpoint before
>> that
>> > ParDo.
>> >
>> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> > wrote:
>> >
>> >> It sounds a good start. I'm not sure how a group by key (and not by
>> >> size) can help controlling the checkpointing interval. Wonder if we
>> >> shouldn't be able to have a CheckpointPolicy { boolean
>> >> shouldCheckpoint() } used in the processing event loop. Default could
>> >> be up to the runner but if set on the transform (or dofn) it would be
>> >> used to control when the checkpoint is done. Thinking out loud it
>> >> sounds close to jbatch checkpoint algorithm
>> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>> >> chunk/CheckpointAlgorithm.html)
>> >>
>> >> Romain Manni-Bucau
>> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>
>> >>
>> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
>> >> > Yes, @StableReplay, that's the annotation. Thanks.
>> >> >
>> >> >
>> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
>> >> >>
>> >> >> Romain,
>> >> >>
>> >> >> I think the @StableReplay semantic that Kenn proposed a month or so
>> ago
>> >> is
>> >> >> what is needed here.
>> >> >>
>> >> >> Essentially it will ensure that the GroupByKey iterable is stable and
>> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to receive
>> the
>> >> >> exact same iterable as it did before. The annotation can be put on a
>> >> ParDo
>> >> >> as well, in which case it ensures stability (and checkpointing) of
>> the
>> >> >> individual ParDo elements.
>> >> >>
>> >> >> Reuven
>> >> >>
>> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> >> >> 
>> >> >> wrote:
>> >> >>
>> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :
>> >> 
>> >>  Hi Romain,
>> >> 
>> >>  You are right: currently, the chunking is related to bundles.
>> Today,
>> >> the
>> >>  bundle size is under the runner responsibility.
>> >> 
>> >>  I think it's fine because only the runner know an efficient bundle
>> >> size.
>> >> >>>
>> >> >>> I'm
>> >> 
>> >>  afraid giving the "control" of the bundle size to the end user (via
>> >>  pipeline) can result to huge performances issue depending of the
>> >> runner.
>> >> 
>> >>  It doesn't mean that we can't use an uber layer: it's what we do in
>> >>  ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>> >> 
>> >>  Anyway, the core problem is about the checkpoint: why a checkpoint
>> is
>> >>  not
>> >>  "respected" by an IO or runner ?
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> Take the example of a runner deciding the bundle size is 4 and the
>> IO
>> >> >>> deciding the commit-interval (batch semantic) is 2, what happens if
>> >> >>> the 3rd record fails? You have pushed to the store 2 records which
>> can
>> >> >>> be reprocessed by a restart of the bundle and you can get
>> duplicates.
>> >> >>>
>> >> >>> Rephrased: I think we need as a framework a batch/chunk solution
>> which
>> >> >>> is reliable. I understand bundles is mapped on the runner and not
>> >> >>> really controlled but can we get something more reliable for the
>> user?
>> >> >>> Maybe we need a @BeforeBatch or something like that.
>> >> >>>
>> >> 
>> >>  Regards
>> >>  JB
>> >> 
>> >> 
>> >>  On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>> >> >
>> >> >
>> >> > Hi guys,
>> >> >
>> >> > The subject is a bit provocative but the topic is real and coming
>> >> > again and again with the beam usage: how a dofn can handle some
>> >> > "chunking".
>> >> >
>> >> > The need is to be able to commit each N records but with N not too
>> >> big.
>> >> >
>> >> > The natural API for that in beam is the bundle one but bundles are
>> >> not
>> >> > reliable 

Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
It's in the dev list archives, not sure if there's a doc yet.

I'm not quite sure I understand what you mean by a "flush" Can you describe
the problem you're trying to solve?

Reuven

On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau 
wrote:

> Hmm, I didn't find the doc - if you have the link not far it would be
> appreciated - but "before" sounds not enough, it should be "after" in
> case there was a "flush" no?
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 10:10 GMT+01:00 Reuven Lax :
> > If you set @StableReplay before a ParDo, it forces a checkpoint before
> that
> > ParDo.
> >
> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >
> >> It sounds a good start. I'm not sure how a group by key (and not by
> >> size) can help controlling the checkpointing interval. Wonder if we
> >> shouldn't be able to have a CheckpointPolicy { boolean
> >> shouldCheckpoint() } used in the processing event loop. Default could
> >> be up to the runner but if set on the transform (or dofn) it would be
> >> used to control when the checkpoint is done. Thinking out loud it
> >> sounds close to jbatch checkpoint algorithm
> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> >> chunk/CheckpointAlgorithm.html)
> >>
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
> >>
> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
> >> > Yes, @StableReplay, that's the annotation. Thanks.
> >> >
> >> >
> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >> >>
> >> >> Romain,
> >> >>
> >> >> I think the @StableReplay semantic that Kenn proposed a month or so
> ago
> >> is
> >> >> what is needed here.
> >> >>
> >> >> Essentially it will ensure that the GroupByKey iterable is stable and
> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to receive
> the
> >> >> exact same iterable as it did before. The annotation can be put on a
> >> ParDo
> >> >> as well, in which case it ensures stability (and checkpointing) of
> the
> >> >> individual ParDo elements.
> >> >>
> >> >> Reuven
> >> >>
> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> >> >> 
> >> >> wrote:
> >> >>
> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :
> >> 
> >>  Hi Romain,
> >> 
> >>  You are right: currently, the chunking is related to bundles.
> Today,
> >> the
> >>  bundle size is under the runner responsibility.
> >> 
> >>  I think it's fine because only the runner know an efficient bundle
> >> size.
> >> >>>
> >> >>> I'm
> >> 
> >>  afraid giving the "control" of the bundle size to the end user (via
> >>  pipeline) can result to huge performances issue depending of the
> >> runner.
> >> 
> >>  It doesn't mean that we can't use an uber layer: it's what we do in
> >>  ParDoWithBatch or DoFn in IO Sink where we have a batch size.
> >> 
> >>  Anyway, the core problem is about the checkpoint: why a checkpoint
> is
> >>  not
> >>  "respected" by an IO or runner ?
> >> >>>
> >> >>>
> >> >>>
> >> >>> Take the example of a runner deciding the bundle size is 4 and the
> IO
> >> >>> deciding the commit-interval (batch semantic) is 2, what happens if
> >> >>> the 3rd record fails? You have pushed to the store 2 records which
> can
> >> >>> be reprocessed by a restart of the bundle and you can get
> duplicates.
> >> >>>
> >> >>> Rephrased: I think we need as a framework a batch/chunk solution
> which
> >> >>> is reliable. I understand bundles is mapped on the runner and not
> >> >>> really controlled but can we get something more reliable for the
> user?
> >> >>> Maybe we need a @BeforeBatch or something like that.
> >> >>>
> >> 
> >>  Regards
> >>  JB
> >> 
> >> 
> >>  On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> >> >
> >> >
> >> > Hi guys,
> >> >
> >> > The subject is a bit provocative but the topic is real and coming
> >> > again and again with the beam usage: how a dofn can handle some
> >> > "chunking".
> >> >
> >> > The need is to be able to commit each N records but with N not too
> >> big.
> >> >
> >> > The natural API for that in beam is the bundle one but bundles are
> >> not
> >> > reliable since they can be very small (flink) - we can say it is
> "ok"
> >> > even if it has some perf impacts - or too big (spark does full
> size /
> >> > #workers).
> >> >
> >> > The workaround is what we see in the ES I/O: a maxSize which does
> an
> >> > eager flush. The issue is that then the checkpoint is not
> respected
> >> > and you can process multiple times the same records.
> >> >
> >> > Any plan to make this API reliable and controllable from a beam
> point
> >> > of view (at least in a max manner)?
> >> >
> >> > 

Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Hmm, I didn't find the doc - if you have the link not far it would be
appreciated - but "before" sounds not enough, it should be "after" in
case there was a "flush" no?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:10 GMT+01:00 Reuven Lax :
> If you set @StableReplay before a ParDo, it forces a checkpoint before that
> ParDo.
>
> On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau 
> wrote:
>
>> It sounds a good start. I'm not sure how a group by key (and not by
>> size) can help controlling the checkpointing interval. Wonder if we
>> shouldn't be able to have a CheckpointPolicy { boolean
>> shouldCheckpoint() } used in the processing event loop. Default could
>> be up to the runner but if set on the transform (or dofn) it would be
>> used to control when the checkpoint is done. Thinking out loud it
>> sounds close to jbatch checkpoint algorithm
>> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>> chunk/CheckpointAlgorithm.html)
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
>> > Yes, @StableReplay, that's the annotation. Thanks.
>> >
>> >
>> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
>> >>
>> >> Romain,
>> >>
>> >> I think the @StableReplay semantic that Kenn proposed a month or so ago
>> is
>> >> what is needed here.
>> >>
>> >> Essentially it will ensure that the GroupByKey iterable is stable and
>> >> checkpointed. So on replay, the GroupByKey is guaranteed to receive the
>> >> exact same iterable as it did before. The annotation can be put on a
>> ParDo
>> >> as well, in which case it ensures stability (and checkpointing) of the
>> >> individual ParDo elements.
>> >>
>> >> Reuven
>> >>
>> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> >> 
>> >> wrote:
>> >>
>> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :
>> 
>>  Hi Romain,
>> 
>>  You are right: currently, the chunking is related to bundles. Today,
>> the
>>  bundle size is under the runner responsibility.
>> 
>>  I think it's fine because only the runner know an efficient bundle
>> size.
>> >>>
>> >>> I'm
>> 
>>  afraid giving the "control" of the bundle size to the end user (via
>>  pipeline) can result to huge performances issue depending of the
>> runner.
>> 
>>  It doesn't mean that we can't use an uber layer: it's what we do in
>>  ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>> 
>>  Anyway, the core problem is about the checkpoint: why a checkpoint is
>>  not
>>  "respected" by an IO or runner ?
>> >>>
>> >>>
>> >>>
>> >>> Take the example of a runner deciding the bundle size is 4 and the IO
>> >>> deciding the commit-interval (batch semantic) is 2, what happens if
>> >>> the 3rd record fails? You have pushed to the store 2 records which can
>> >>> be reprocessed by a restart of the bundle and you can get duplicates.
>> >>>
>> >>> Rephrased: I think we need as a framework a batch/chunk solution which
>> >>> is reliable. I understand bundles is mapped on the runner and not
>> >>> really controlled but can we get something more reliable for the user?
>> >>> Maybe we need a @BeforeBatch or something like that.
>> >>>
>> 
>>  Regards
>>  JB
>> 
>> 
>>  On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>> >
>> >
>> > Hi guys,
>> >
>> > The subject is a bit provocative but the topic is real and coming
>> > again and again with the beam usage: how a dofn can handle some
>> > "chunking".
>> >
>> > The need is to be able to commit each N records but with N not too
>> big.
>> >
>> > The natural API for that in beam is the bundle one but bundles are
>> not
>> > reliable since they can be very small (flink) - we can say it is "ok"
>> > even if it has some perf impacts - or too big (spark does full size /
>> > #workers).
>> >
>> > The workaround is what we see in the ES I/O: a maxSize which does an
>> > eager flush. The issue is that then the checkpoint is not respected
>> > and you can process multiple times the same records.
>> >
>> > Any plan to make this API reliable and controllable from a beam point
>> > of view (at least in a max manner)?
>> >
>> > Thanks,
>> > Romain Manni-Bucau
>> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >
>> 
>>  --
>>  Jean-Baptiste Onofré
>>  jbono...@apache.org
>>  http://blog.nanthrax.net
>>  Talend - http://www.talend.com
>> >>>
>> >>>
>> >>
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbono...@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>>


Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
If you set @StableReplay before a ParDo, it forces a checkpoint before that
ParDo.

On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau 
wrote:

> It sounds a good start. I'm not sure how a group by key (and not by
> size) can help controlling the checkpointing interval. Wonder if we
> shouldn't be able to have a CheckpointPolicy { boolean
> shouldCheckpoint() } used in the processing event loop. Default could
> be up to the runner but if set on the transform (or dofn) it would be
> used to control when the checkpoint is done. Thinking out loud it
> sounds close to jbatch checkpoint algorithm
> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> chunk/CheckpointAlgorithm.html)
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
> > Yes, @StableReplay, that's the annotation. Thanks.
> >
> >
> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >>
> >> Romain,
> >>
> >> I think the @StableReplay semantic that Kenn proposed a month or so ago
> is
> >> what is needed here.
> >>
> >> Essentially it will ensure that the GroupByKey iterable is stable and
> >> checkpointed. So on replay, the GroupByKey is guaranteed to receive the
> >> exact same iterable as it did before. The annotation can be put on a
> ParDo
> >> as well, in which case it ensures stability (and checkpointing) of the
> >> individual ParDo elements.
> >>
> >> Reuven
> >>
> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> >> 
> >> wrote:
> >>
> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :
> 
>  Hi Romain,
> 
>  You are right: currently, the chunking is related to bundles. Today,
> the
>  bundle size is under the runner responsibility.
> 
>  I think it's fine because only the runner know an efficient bundle
> size.
> >>>
> >>> I'm
> 
>  afraid giving the "control" of the bundle size to the end user (via
>  pipeline) can result to huge performances issue depending of the
> runner.
> 
>  It doesn't mean that we can't use an uber layer: it's what we do in
>  ParDoWithBatch or DoFn in IO Sink where we have a batch size.
> 
>  Anyway, the core problem is about the checkpoint: why a checkpoint is
>  not
>  "respected" by an IO or runner ?
> >>>
> >>>
> >>>
> >>> Take the example of a runner deciding the bundle size is 4 and the IO
> >>> deciding the commit-interval (batch semantic) is 2, what happens if
> >>> the 3rd record fails? You have pushed to the store 2 records which can
> >>> be reprocessed by a restart of the bundle and you can get duplicates.
> >>>
> >>> Rephrased: I think we need as a framework a batch/chunk solution which
> >>> is reliable. I understand bundles is mapped on the runner and not
> >>> really controlled but can we get something more reliable for the user?
> >>> Maybe we need a @BeforeBatch or something like that.
> >>>
> 
>  Regards
>  JB
> 
> 
>  On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> >
> >
> > Hi guys,
> >
> > The subject is a bit provocative but the topic is real and coming
> > again and again with the beam usage: how a dofn can handle some
> > "chunking".
> >
> > The need is to be able to commit each N records but with N not too
> big.
> >
> > The natural API for that in beam is the bundle one but bundles are
> not
> > reliable since they can be very small (flink) - we can say it is "ok"
> > even if it has some perf impacts - or too big (spark does full size /
> > #workers).
> >
> > The workaround is what we see in the ES I/O: a maxSize which does an
> > eager flush. The issue is that then the checkpoint is not respected
> > and you can process multiple times the same records.
> >
> > Any plan to make this API reliable and controllable from a beam point
> > of view (at least in a max manner)?
> >
> > Thanks,
> > Romain Manni-Bucau
> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >
> 
>  --
>  Jean-Baptiste Onofré
>  jbono...@apache.org
>  http://blog.nanthrax.net
>  Talend - http://www.talend.com
> >>>
> >>>
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>


Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
It sounds a good start. I'm not sure how a group by key (and not by
size) can help controlling the checkpointing interval. Wonder if we
shouldn't be able to have a CheckpointPolicy { boolean
shouldCheckpoint() } used in the processing event loop. Default could
be up to the runner but if set on the transform (or dofn) it would be
used to control when the checkpoint is done. Thinking out loud it
sounds close to jbatch checkpoint algorithm
(https://docs.oracle.com/javaee/7/api/javax/batch/api/chunk/CheckpointAlgorithm.html)

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré :
> Yes, @StableReplay, that's the annotation. Thanks.
>
>
> On 11/15/2017 09:52 AM, Reuven Lax wrote:
>>
>> Romain,
>>
>> I think the @StableReplay semantic that Kenn proposed a month or so ago is
>> what is needed here.
>>
>> Essentially it will ensure that the GroupByKey iterable is stable and
>> checkpointed. So on replay, the GroupByKey is guaranteed to receive the
>> exact same iterable as it did before. The annotation can be put on a ParDo
>> as well, in which case it ensures stability (and checkpointing) of the
>> individual ParDo elements.
>>
>> Reuven
>>
>> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> 
>> wrote:
>>
>>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :

 Hi Romain,

 You are right: currently, the chunking is related to bundles. Today, the
 bundle size is under the runner responsibility.

 I think it's fine because only the runner know an efficient bundle size.
>>>
>>> I'm

 afraid giving the "control" of the bundle size to the end user (via
 pipeline) can result to huge performances issue depending of the runner.

 It doesn't mean that we can't use an uber layer: it's what we do in
 ParDoWithBatch or DoFn in IO Sink where we have a batch size.

 Anyway, the core problem is about the checkpoint: why a checkpoint is
 not
 "respected" by an IO or runner ?
>>>
>>>
>>>
>>> Take the example of a runner deciding the bundle size is 4 and the IO
>>> deciding the commit-interval (batch semantic) is 2, what happens if
>>> the 3rd record fails? You have pushed to the store 2 records which can
>>> be reprocessed by a restart of the bundle and you can get duplicates.
>>>
>>> Rephrased: I think we need as a framework a batch/chunk solution which
>>> is reliable. I understand bundles is mapped on the runner and not
>>> really controlled but can we get something more reliable for the user?
>>> Maybe we need a @BeforeBatch or something like that.
>>>

 Regards
 JB


 On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>
>
> Hi guys,
>
> The subject is a bit provocative but the topic is real and coming
> again and again with the beam usage: how a dofn can handle some
> "chunking".
>
> The need is to be able to commit each N records but with N not too big.
>
> The natural API for that in beam is the bundle one but bundles are not
> reliable since they can be very small (flink) - we can say it is "ok"
> even if it has some perf impacts - or too big (spark does full size /
> #workers).
>
> The workaround is what we see in the ES I/O: a maxSize which does an
> eager flush. The issue is that then the checkpoint is not respected
> and you can process multiple times the same records.
>
> Any plan to make this API reliable and controllable from a beam point
> of view (at least in a max manner)?
>
> Thanks,
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>

 --
 Jean-Baptiste Onofré
 jbono...@apache.org
 http://blog.nanthrax.net
 Talend - http://www.talend.com
>>>
>>>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Fwd: Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Etienne Chauchot

Hi all,

CCing of the dev list failed, so I forward the email :)



 Message transféré 
Sujet : 	Re: Does ElasticsearchIO in the latest RC support adding 
document IDs?

Date :  Wed, 15 Nov 2017 09:53:46 +0100
De :Etienne Chauchot 
Pour :  Chet Aldrich , u...@beam.apache.org
Copie à :   Philip Chan , echauc...@gmail.com



Hi Chet,

What you say is totally true, docs written using ElasticSearchIO will 
always have an ES generated id. But it might change in the future, 
indeed it might be a good thing to allow the user to pass an id. Just in 
5 seconds thinking, I see 3 possible designs for that.


a.(simplest) use a json special field for the id, if it is provided by 
the user in the input json then it is used, auto-generated id otherwise.


b. (a bit less user friendly) PCollection with K as an id. But 
forces the user to do a Pardo before writing to ES to output KV pairs of 



c. (a lot more complex) Allow the IO to serialize/deserialize java beans 
and have an String id field. Matching java types to ES types is quite 
tricky, so, for now we just relied on the user to serialize his beans 
into json and let ES match the types automatically.


Related to the problems you raise bellow:

1. Well, the bundle is the commit entity of beam. Consider the case of 
ESIO.batchSize being < to bundle size. While processing records, when 
the number of elements reaches batchSize, an ES bulk insert will be 
issued but no finishBundle. If there is a problem later on in the bundle 
processing before the finishBundle, the checkpoint will still be at the 
beginning of the bundle, so all the bundle will be retried leading to 
duplicate documents. Thanks for raising that! I'm CCing the dev list so 
that someone could correct me on the checkpointing mecanism if I'm 
missing something. Besides I'm thinking about forcing the user to 
provide an id in all cases to workaround this issue.


2. Correct.

Best,
Etienne

Le 15/11/2017 à 02:16, Chet Aldrich a écrit :

Hello all!

So I’ve been using the ElasticSearchIO sink for a project 
(unfortunately it’s Elasticsearch 5.x, and so I’ve been messing around 
with the latest RC) and I’m finding that it doesn’t allow for changing 
the document ID, but only lets you pass in a record, which means that 
the document ID is auto-generated. See this line for what specifically 
is happening:


https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838

Essentially the data part of the document is being placed but it 
doesn’t allow for other properties, such as the document ID, to be set.


This leads to two problems:

1. Beam doesn’t necessarily guarantee exactly-once execution for a 
given item in a PCollection, as I understand it. This means that you 
may get more than one record in Elastic for a given item in a 
PCollection that you pass in.


2. You can’t do partial updates to an index. If you run a batch job 
once, and then run the batch job again on the same index without 
clearing it, you just double everything in there.


Is there any good way around this?

I’d be happy to try writing up a PR for this in theory, but not sure 
how to best approach it. Also would like to figure out a way to get 
around this in the meantime, if anyone has any ideas.


Best,

Chet

P.S. CCed echauc...@gmail.com  because it 
seems like he’s been doing work related to the elastic sink.







Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré

Yes, @StableReplay, that's the annotation. Thanks.

On 11/15/2017 09:52 AM, Reuven Lax wrote:

Romain,

I think the @StableReplay semantic that Kenn proposed a month or so ago is
what is needed here.

Essentially it will ensure that the GroupByKey iterable is stable and
checkpointed. So on replay, the GroupByKey is guaranteed to receive the
exact same iterable as it did before. The annotation can be put on a ParDo
as well, in which case it ensures stability (and checkpointing) of the
individual ParDo elements.

Reuven

On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau 
wrote:


2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :

Hi Romain,

You are right: currently, the chunking is related to bundles. Today, the
bundle size is under the runner responsibility.

I think it's fine because only the runner know an efficient bundle size.

I'm

afraid giving the "control" of the bundle size to the end user (via
pipeline) can result to huge performances issue depending of the runner.

It doesn't mean that we can't use an uber layer: it's what we do in
ParDoWithBatch or DoFn in IO Sink where we have a batch size.

Anyway, the core problem is about the checkpoint: why a checkpoint is not
"respected" by an IO or runner ?



Take the example of a runner deciding the bundle size is 4 and the IO
deciding the commit-interval (batch semantic) is 2, what happens if
the 3rd record fails? You have pushed to the store 2 records which can
be reprocessed by a restart of the bundle and you can get duplicates.

Rephrased: I think we need as a framework a batch/chunk solution which
is reliable. I understand bundles is mapped on the runner and not
really controlled but can we get something more reliable for the user?
Maybe we need a @BeforeBatch or something like that.



Regards
JB


On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:


Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too big.

The natural API for that in beam is the bundle one but bundles are not
reliable since they can be very small (flink) - we can say it is "ok"
even if it has some perf impacts - or too big (spark does full size /
#workers).

The workaround is what we see in the ES I/O: a maxSize which does an
eager flush. The issue is that then the checkpoint is not respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam point
of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com






--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: makes bundle concept usable?

2017-11-15 Thread Jean-Baptiste Onofré

Got it now.

AFAIR, Kenn discussed about an annotation related to that recently. I don't 
remember the annotation, but basically it was a link between a group of elements 
(batch) and the checkpoint.


Regards
JB

On 11/15/2017 09:49 AM, Romain Manni-Bucau wrote:

2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :

Hi Romain,

You are right: currently, the chunking is related to bundles. Today, the
bundle size is under the runner responsibility.

I think it's fine because only the runner know an efficient bundle size. I'm
afraid giving the "control" of the bundle size to the end user (via
pipeline) can result to huge performances issue depending of the runner.

It doesn't mean that we can't use an uber layer: it's what we do in
ParDoWithBatch or DoFn in IO Sink where we have a batch size.

Anyway, the core problem is about the checkpoint: why a checkpoint is not
"respected" by an IO or runner ?



Take the example of a runner deciding the bundle size is 4 and the IO
deciding the commit-interval (batch semantic) is 2, what happens if
the 3rd record fails? You have pushed to the store 2 records which can
be reprocessed by a restart of the bundle and you can get duplicates.

Rephrased: I think we need as a framework a batch/chunk solution which
is reliable. I understand bundles is mapped on the runner and not
really controlled but can we get something more reliable for the user?
Maybe we need a @BeforeBatch or something like that.



Regards
JB


On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:


Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too big.

The natural API for that in beam is the bundle one but bundles are not
reliable since they can be very small (flink) - we can say it is "ok"
even if it has some perf impacts - or too big (spark does full size /
#workers).

The workaround is what we see in the ES I/O: a maxSize which does an
eager flush. The issue is that then the checkpoint is not respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam point
of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Romain,

I think the @StableReplay semantic that Kenn proposed a month or so ago is
what is needed here.

Essentially it will ensure that the GroupByKey iterable is stable and
checkpointed. So on replay, the GroupByKey is guaranteed to receive the
exact same iterable as it did before. The annotation can be put on a ParDo
as well, in which case it ensures stability (and checkpointing) of the
individual ParDo elements.

Reuven

On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau 
wrote:

> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :
> > Hi Romain,
> >
> > You are right: currently, the chunking is related to bundles. Today, the
> > bundle size is under the runner responsibility.
> >
> > I think it's fine because only the runner know an efficient bundle size.
> I'm
> > afraid giving the "control" of the bundle size to the end user (via
> > pipeline) can result to huge performances issue depending of the runner.
> >
> > It doesn't mean that we can't use an uber layer: it's what we do in
> > ParDoWithBatch or DoFn in IO Sink where we have a batch size.
> >
> > Anyway, the core problem is about the checkpoint: why a checkpoint is not
> > "respected" by an IO or runner ?
>
>
> Take the example of a runner deciding the bundle size is 4 and the IO
> deciding the commit-interval (batch semantic) is 2, what happens if
> the 3rd record fails? You have pushed to the store 2 records which can
> be reprocessed by a restart of the bundle and you can get duplicates.
>
> Rephrased: I think we need as a framework a batch/chunk solution which
> is reliable. I understand bundles is mapped on the runner and not
> really controlled but can we get something more reliable for the user?
> Maybe we need a @BeforeBatch or something like that.
>
> >
> > Regards
> > JB
> >
> >
> > On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> >>
> >> Hi guys,
> >>
> >> The subject is a bit provocative but the topic is real and coming
> >> again and again with the beam usage: how a dofn can handle some
> >> "chunking".
> >>
> >> The need is to be able to commit each N records but with N not too big.
> >>
> >> The natural API for that in beam is the bundle one but bundles are not
> >> reliable since they can be very small (flink) - we can say it is "ok"
> >> even if it has some perf impacts - or too big (spark does full size /
> >> #workers).
> >>
> >> The workaround is what we see in the ES I/O: a maxSize which does an
> >> eager flush. The issue is that then the checkpoint is not respected
> >> and you can process multiple times the same records.
> >>
> >> Any plan to make this API reliable and controllable from a beam point
> >> of view (at least in a max manner)?
> >>
> >> Thanks,
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>


Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré :
> Hi Romain,
>
> You are right: currently, the chunking is related to bundles. Today, the
> bundle size is under the runner responsibility.
>
> I think it's fine because only the runner know an efficient bundle size. I'm
> afraid giving the "control" of the bundle size to the end user (via
> pipeline) can result to huge performances issue depending of the runner.
>
> It doesn't mean that we can't use an uber layer: it's what we do in
> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>
> Anyway, the core problem is about the checkpoint: why a checkpoint is not
> "respected" by an IO or runner ?


Take the example of a runner deciding the bundle size is 4 and the IO
deciding the commit-interval (batch semantic) is 2, what happens if
the 3rd record fails? You have pushed to the store 2 records which can
be reprocessed by a restart of the bundle and you can get duplicates.

Rephrased: I think we need as a framework a batch/chunk solution which
is reliable. I understand bundles is mapped on the runner and not
really controlled but can we get something more reliable for the user?
Maybe we need a @BeforeBatch or something like that.

>
> Regards
> JB
>
>
> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>>
>> Hi guys,
>>
>> The subject is a bit provocative but the topic is real and coming
>> again and again with the beam usage: how a dofn can handle some
>> "chunking".
>>
>> The need is to be able to commit each N records but with N not too big.
>>
>> The natural API for that in beam is the bundle one but bundles are not
>> reliable since they can be very small (flink) - we can say it is "ok"
>> even if it has some perf impacts - or too big (spark does full size /
>> #workers).
>>
>> The workaround is what we see in the ES I/O: a maxSize which does an
>> eager flush. The issue is that then the checkpoint is not respected
>> and you can process multiple times the same records.
>>
>> Any plan to make this API reliable and controllable from a beam point
>> of view (at least in a max manner)?
>>
>> Thanks,
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Romain,

Are you saying that you need the set of elements in the bundle to be
deterministic, so on retry you get the same bundle back?

Reuven

On Wed, Nov 15, 2017 at 4:44 PM, Romain Manni-Bucau 
wrote:

> Hi Reuven,
>
> how does it help since you will still "send to the transform" the data
> before the commit point of beam and therefore not be able to reprocess
> the same data in case of a failure between your eager flush and next
> commit point?
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 9:43 GMT+01:00 Reuven Lax :
> > Bundles are currently completely up to the runner, and different runners
> do
> > them differently. In addition to Flink, the Dataflow runner creates
> > smallish bundles when run in streaming mode, as the streaming-mode runner
> > is optimizing for latency (so a bundle might be small simply because not
> > enough time has passed to make it large). Bundles being large is usually
> > less of an issue - you simply need to occasionally commit in
> processElement
> > if the bundle being built up is too large.
> >
> > However there is a simple workaround. You can GBK on a fixed number of
> > shard keys and set a elementCountAtLeast trigger to trigger when enough
> > data has arrived.
> >
> > Reuven
> >
> > On Wed, Nov 15, 2017 at 4:38 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >
> >> Hi guys,
> >>
> >> The subject is a bit provocative but the topic is real and coming
> >> again and again with the beam usage: how a dofn can handle some
> >> "chunking".
> >>
> >> The need is to be able to commit each N records but with N not too big.
> >>
> >> The natural API for that in beam is the bundle one but bundles are not
> >> reliable since they can be very small (flink) - we can say it is "ok"
> >> even if it has some perf impacts - or too big (spark does full size /
> >> #workers).
> >>
> >> The workaround is what we see in the ES I/O: a maxSize which does an
> >> eager flush. The issue is that then the checkpoint is not respected
> >> and you can process multiple times the same records.
> >>
> >> Any plan to make this API reliable and controllable from a beam point
> >> of view (at least in a max manner)?
> >>
> >> Thanks,
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
>


Re: makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Hi Reuven,

how does it help since you will still "send to the transform" the data
before the commit point of beam and therefore not be able to reprocess
the same data in case of a failure between your eager flush and next
commit point?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 9:43 GMT+01:00 Reuven Lax :
> Bundles are currently completely up to the runner, and different runners do
> them differently. In addition to Flink, the Dataflow runner creates
> smallish bundles when run in streaming mode, as the streaming-mode runner
> is optimizing for latency (so a bundle might be small simply because not
> enough time has passed to make it large). Bundles being large is usually
> less of an issue - you simply need to occasionally commit in processElement
> if the bundle being built up is too large.
>
> However there is a simple workaround. You can GBK on a fixed number of
> shard keys and set a elementCountAtLeast trigger to trigger when enough
> data has arrived.
>
> Reuven
>
> On Wed, Nov 15, 2017 at 4:38 PM, Romain Manni-Bucau 
> wrote:
>
>> Hi guys,
>>
>> The subject is a bit provocative but the topic is real and coming
>> again and again with the beam usage: how a dofn can handle some
>> "chunking".
>>
>> The need is to be able to commit each N records but with N not too big.
>>
>> The natural API for that in beam is the bundle one but bundles are not
>> reliable since they can be very small (flink) - we can say it is "ok"
>> even if it has some perf impacts - or too big (spark does full size /
>> #workers).
>>
>> The workaround is what we see in the ES I/O: a maxSize which does an
>> eager flush. The issue is that then the checkpoint is not respected
>> and you can process multiple times the same records.
>>
>> Any plan to make this API reliable and controllable from a beam point
>> of view (at least in a max manner)?
>>
>> Thanks,
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>


Re: makes bundle concept usable?

2017-11-15 Thread Reuven Lax
Bundles are currently completely up to the runner, and different runners do
them differently. In addition to Flink, the Dataflow runner creates
smallish bundles when run in streaming mode, as the streaming-mode runner
is optimizing for latency (so a bundle might be small simply because not
enough time has passed to make it large). Bundles being large is usually
less of an issue - you simply need to occasionally commit in processElement
if the bundle being built up is too large.

However there is a simple workaround. You can GBK on a fixed number of
shard keys and set a elementCountAtLeast trigger to trigger when enough
data has arrived.

Reuven

On Wed, Nov 15, 2017 at 4:38 PM, Romain Manni-Bucau 
wrote:

> Hi guys,
>
> The subject is a bit provocative but the topic is real and coming
> again and again with the beam usage: how a dofn can handle some
> "chunking".
>
> The need is to be able to commit each N records but with N not too big.
>
> The natural API for that in beam is the bundle one but bundles are not
> reliable since they can be very small (flink) - we can say it is "ok"
> even if it has some perf impacts - or too big (spark does full size /
> #workers).
>
> The workaround is what we see in the ES I/O: a maxSize which does an
> eager flush. The issue is that then the checkpoint is not respected
> and you can process multiple times the same records.
>
> Any plan to make this API reliable and controllable from a beam point
> of view (at least in a max manner)?
>
> Thanks,
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>


makes bundle concept usable?

2017-11-15 Thread Romain Manni-Bucau
Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too big.

The natural API for that in beam is the bundle one but bundles are not
reliable since they can be very small (flink) - we can say it is "ok"
even if it has some perf impacts - or too big (spark does full size /
#workers).

The workaround is what we see in the ES I/O: a maxSize which does an
eager flush. The issue is that then the checkpoint is not respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam point
of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


Jenkins build became unstable: beam_Release_NightlySnapshot #594

2017-11-15 Thread Apache Jenkins Server
See