Re: [PROPOSAL] Merge gearpump-runner to master

2017-08-10 Thread Manu Zhang
Hi Paul,

The latest master compiles fine for me. Could you check again ?
You may also want to check out the contribution guide
. In short, the
Apache way is to file a JIRA issue
 and
submit a GitHub pull request  to fix
it.

Thanks,
Manu

On Tue, Aug 8, 2017 at 1:17 PM Paul Findlay  wrote:

> Cheers team.
>
> I have found a compilation issue on master (in
> CreateGearpumpPCollectionView), attached is a small patch
>
> Kind regards,
>
> Paul
>
> On Tue, Aug 8, 2017 at 4:10 PM, Manu Zhang 
> wrote:
>
>> Thanks Kenn!!! Thanks everyone!!! It's a great achievement for us.
>>
>> On Tue, Aug 8, 2017 at 7:54 AM Kenneth Knowles 
>> wrote:
>>
>> > Done!
>> >
>> > On Fri, Jul 21, 2017 at 11:08 PM, Jean-Baptiste Onofré > >
>> > wrote:
>> >
>> > > +1
>> > >
>> > > Regards
>> > > JB
>> > >
>> > > On Jul 22, 2017, 05:06, at 05:06, Kenneth Knowles
>> > > >
>> > > wrote:
>> > > >+1 to this!
>> > > >
>> > > >I really want to call out the longevity of contribution behind this,
>> > > >following many changes in both Beam and Gearpump for over a year.
>> > > >Here's
>> > > >the first commit on the branch:
>> > > >
>> > > >commit 9478f4117de3a2d0ea40614ed4cb801918610724 (github/pr/323)
>> > > >Author: manuzhang 
>> > > >Date:   Tue Mar 15 16:15:16 2016 +0800
>> > > >
>> > > >And here are some numbers, FWIW: 163 non-merge commits, 203 total. So
>> > > >that's a PR and review every couple of weeks.
>> > > >
>> > > >The ValidatesRunner capability coverage is very good. The only
>> skipped
>> > > >tests are state/timers, metrics, and TestStream, which many runners
>> > > >have
>> > > >partial or no support for.
>> > > >
>> > > >I'll save practical TODOs like moving ValidatesRunner execution to
>> > > >postcommit, etc. Pending the results of this discussion, of course.
>> > > >
>> > > >Kenn
>> > > >
>> > > >
>> > > >On Fri, Jul 21, 2017 at 12:02 AM, Manu Zhang <
>> owenzhang1...@gmail.com>
>> > > >wrote:
>> > > >
>> > > >> Guys,
>> > > >>
>> > > >> On behalf of the gearpump team, I'd like to propose to merge the
>> > > >> gearpump-runner branch into master, which will give it more
>> > > >visibility to
>> > > >> other contributors and users. The runner satisfies the following
>> > > >criteria
>> > > >> outlined in contribution guide [1].
>> > > >>
>> > > >>
>> > > >>1. Have at least 2 contributors interested in maintaining it,
>> and
>> > > >1
>> > > >>committer interested in supporting it: *Both Huafeng and me have
>> > > >been
>> > > >>making contributions[2] and we will continue to maintain it.
>> Kenn
>> > > >and JB
>> > > >>have been supporting the runner (Thank you, guys!)*
>> > > >>2. Provide both end-user and developer-facing documentation*:
>> They
>> > > >are
>> > > >>already on the website ([3] and [4]).*
>> > > >>3. Have at least a basic level of unit test coverage: *We do.*
>> > > >*[5]*
>> > > >>4. Run all existing applicable integration tests with other Beam
>> > > >>components and create additional tests as appropriate:
>> > > >*gearpump-runner
>> > > >>passes ValidatesRunner tests.*
>> > > >>
>> > > >>
>> > > >> Additionally, as a runner,
>> > > >>
>> > > >>
>> > > >>1. Be able to handle a subset of the model that address a
>> > > >significant
>> > > >>set of use cases (aka. ‘traditional batch’ or ‘processing time
>> > > >> streaming’): *gearpump
>> > > >>runner is able to handle event time streaming *
>> > > >>2. Update the capability matrix with the current status: *[4]*
>> > > >>3. Add a webpage under documentation/runners: *[3]*
>> > > >>
>> > > >>
>> > > >> The PR for the merge: https://github.com/apache/beam/pull/3611
>> > > >>
>> > > >> Thanks,
>> > > >> Manu
>> > > >>
>> > > >>
>> > > >> [1]
>> > > >
>> http://beam.apache.org/contribute/contribution-guide/#feature-branches
>> > > >> [2] https://issues.apache.org/jira/browse/BEAM-79
>> > > >> [3] https://beam.apache.org/documentation/runners/gearpump/
>> > > >> [4]
>> https://beam.apache.org/documentation/runners/capability-matrix/
>> > > >> [5]
>> > > >> https://github.com/apache/beam/tree/gearpump-runner/
>> > > >> runners/gearpump/src/test/java/org/apache/beam/runners/gearpump
>> > > >>
>> > >
>> >
>>
>
>


Re: Requiring PTransform to set a coder on its resulting collections

2017-08-10 Thread Reuven Lax
Interestingly I've seen examples of PTransforms where the transform itself
is unable to easily set its own coder. This happens when the transform is
parametrized in such a way that its ouput coder is not determinable except
by the caller of the PTransform. The caller can of course pass a coder into
the constructor of the PTransform, but that's not any cleaner than simply
calling setCoder on the output.

On Thu, Aug 10, 2017 at 4:57 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> I've updated the guidance in PTransform Style Guide on setting coders
> https://beam.apache.org/contribute/ptransform-style-guide/#coders
> according to this discussion.
> https://github.com/apache/beam-site/pull/279
>
> On Thu, Aug 3, 2017 at 6:27 PM Robert Bradshaw  >
> wrote:
>
> > On Thu, Aug 3, 2017 at 6:08 PM, Eugene Kirpichov
> >  wrote:
> > > https://github.com/apache/beam/pull/3649 has landed. The main
> > contribution
> > > of this PR is deprecating PTransform.getDefaultOutputCoder().
> > >
> > > Next steps are to get rid of all setCoder() calls in the SDK, and
> > deprecate
> > > setCoder().
> > > Nearly all setCoder() calls (perhaps simply all?) I found are on the
> > output
> > > of mapping transforms, such as ParDo, Map/FlatMapElements, WithKeys.
> > > I think we should simply make these transforms optionally configurable
> > with
> > > an output coder: e.g. input.apply(ParDo.of(new
> > > SomeFn<>()).withOutputCoder(SomeCoder.of()))
> > > For multi-output ParDo this is a little more complex API-wise, but
> doable
> > > too.
> > >
> > > (another minor next step is to say in PTransform Style Guide that the
> > > transform must set a coder on all its outputs)
> > >
> > > Sounds reasonable?
> >
> > +1
> >
> > I'd like to do this in a way that lowers the burden for all PTransform
> > authors. Can't think of a better way than a special subclass of
> > PTransform that has the setters that one would subclass...
> >
> > > On Thu, Aug 3, 2017 at 5:34 AM Lukasz Cwik 
> > wrote:
> > >
> > >> I'm for (1) and am not sure about the feasibility of (2) without
> having
> > an
> > >> escape hatch that allows a pipeline author to specify a coder to
> handle
> > >> their special case.
> > >>
> > >> On Tue, Aug 1, 2017 at 2:15 PM, Reuven Lax 
> > >> wrote:
> > >>
> > >> > One interesting wrinkle: I'm about to propose a set of semantics for
> > >> > snapshotting/in-place updating pipelines. Part of this proposal is
> the
> > >> > ability to write pipelines to "upgrade" snapshots to make them
> > compatible
> > >> > with new graphs. This relies on the ability to have two separate
> > coders
> > >> for
> > >> > the same type - the old coder and the new coder - in order to handle
> > the
> > >> > case where the user has changed coders in the new pipeline.
> > >> >
> > >> > On Tue, Aug 1, 2017 at 2:12 PM, Robert Bradshaw
> > >> >  > >> > > wrote:
> > >> >
> > >> > > There are two concerns in this thread:
> > >> > >
> > >> > > (1) Getting rid of PCollection.setCoder(). Everyone seems in favor
> > of
> > >> > this
> > >> > > (right?)
> > >> > >
> > >> > > (2) Deprecating specifying Coders in favor of specifying
> > >> TypeDescriptors.
> > >> > > I'm generally in favor, but it's unclear how far we can push this
> > >> > through.
> > >> > >
> > >> > > Let's at least do (1), and separately state a preference for (2),
> > >> seeing
> > >> > > how fare we can push it.
> > >> > >
> > >> > > On Thu, Jul 27, 2017 at 9:13 PM, Kenneth Knowles
> > >>  > >> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Another thought on this: setting a custom coder to support a
> > special
> > >> > data
> > >> > > > distribution is likely often a property of the input to the
> > pipeline.
> > >> > So
> > >> > > > setting a coder during pipeline construction - more generally,
> > when
> > >> > > writing
> > >> > > > a composite transform for reuse - you might not actually have
> the
> > >> > needed
> > >> > > > information. But setting up a special indicator type descriptor
> > lets
> > >> > your
> > >> > > > users map that type descriptor to a coder that works well for
> > their
> > >> > data.
> > >> > > >
> > >> > > > But Robert's example of RawUnionValue seems like a deal breaker
> > for
> > >> all
> > >> > > > approaches. It really requires .getCoder() during expand() and
> > >> > explicitly
> > >> > > > building coders encoding information that is cumbersome to get
> > into a
> > >> > > > TypeDescriptor. While making up new type languages is a
> > comfortable
> > >> > > > activity for me :-) I don't think we should head down that path,
> > for
> > >> > our
> > >> > > > users' sake. So I'll stop hoping we can eliminate this pain
> point
> > for
> > >> > > now.
> > >> > > >
> > >> > > > Kenn
> > >> > > >
> > >> > > > On Thu, Jul 27, 2017 at 8:48 PM, Kenneth Knowles <
> k...@google.com>
> > 

Re: Style of messages for checkArgument/checkNotNull in IOs

2017-08-10 Thread Eugene Kirpichov
https://beam.apache.org/contribute/ptransform-style-guide/#validation now
includes the new guidance.

It also includes updated guidance on what to put in expand() vs. validate()
(TL;DR: validate() is almost always unnecessary. Put almost all validation
in expand())

On Fri, Jul 28, 2017 at 11:56 AM Kenneth Knowles 
wrote:

> So here's an easy solution: our own checkNotNull that throws
> InvalidArgumentException with a good error message. The error message can
> then be templated to allow terse invocations with just the method and
> parameter.
>
> Unsure why I didn't go for this straightaway.
>
> On Fri, Jul 28, 2017 at 11:43 AM, Reuven Lax 
> wrote:
>
> > Yuck. I think that checkNotNull throwing a NPE is a very poor design
> choice
> > from the author of checkNotNull.
> >
> > On Fri, Jul 28, 2017 at 11:29 AM, Kenneth Knowles  >
> > wrote:
> >
> > > As to the choice of check method:
> > >
> > >  - checkArgument throws an InvalidArgumentException, which is clearly
> in
> > > "your fault" category, a la HTTP 4xx
> > >  - checkNotNull throws an NPE, which is usually a "my fault"
> exception, a
> > > la HTTP 5xx.
> > >
> > > The docs on NPE are not clear on blame, and this is a bug in the docs.
> > But
> > > almost all the time, an NPE indicates that the line where it is thrown
> is
> > > incorrect. InvalidArgumentException is unambiguous. This could also be
> > > called a bug in checkNotNull. It throws the same exception as if you
> > > _forgot_ to check if it  was not null. So it sort of doesn't do one of
> > the
> > > most important things it should be doing.
> > >
> > > As to verbosity: All error messages should be actionable. We have a
> > chronic
> > > problem with terrible or nonexistent error messages.
> > >
> > > NPE is uninformative and this feeds into the prior two bullets: If I
> see
> > > "NPE on line XYZ of file ABC" I am _always_ going to file a bug against
> > the
> > > author of file ABC because they dereferenced null. Their fix might be
> to
> > > simply protect themselves with a checkArgument to clearly blame their
> > > caller, but that is a totally acceptable bug/fix pair.
> > >
> > > We should really get an analysis in place based on @Nullable
> annotations
> > to
> > > mitigate this a bit, too.
> > >
> > > Kenn
> > >
> > > On Fri, Jul 28, 2017 at 11:17 AM, Eugene Kirpichov <
> > > kirpic...@google.com.invalid> wrote:
> > >
> > > > Hey all,
> > > >
> > > > I think this has been discussed before on a JIRA issue but I can't
> find
> > > it,
> > > > so raising again on the mailing list.
> > > >
> > > > Various IO (and non-IO) transforms validate their builder parameters
> > > using
> > > > Preconditions.checkArgument/checkNotNull, and use different styles
> for
> > > > error messages. There are 2 major styles:
> > > >
> > > > 1) style such as:
> > > > checkNotNull(username, "username"), or checkArgument(username !=
> null,
> > > > "username can not be null") or checkArgument(username != null,
> > > > "username must be set");
> > > > checkArgument(batchSize > 0, "batchSize must be non-negative, but
> was:
> > > %s",
> > > > batchSize)
> > > >
> > > > 2) style such as:
> > > >   checkArgument(
> > > >username != null,
> > > >"ConnectionConfiguration.create().withBasicCredentials(
> > > > username,
> > > > password) "
> > > >+ "called with null username");
> > > >checkArgument(
> > > >!username.isEmpty(),
> > > >"ConnectionConfiguration.create().withBasicCredentials(
> > > > username,
> > > > password) "
> > > >+ "called with empty username");
> > > >
> > > > Style 2 is recommended by the PTransform Style Guide
> > > >
> https://beam.apache.org/contribute/ptransform-style-guide/#transform-
> > > > configuration-errors
> > > >
> > > > However:
> > > > 1) The usage of these two styles is not consistent - both are used in
> > > about
> > > > the same amounts in Beam IOs.
> > > > 2) Style 2 seems unnecessarily verbose to me. The exception thrown
> > from a
> > > > checkArgument or checkNotNull already includes the method being
> called
> > > into
> > > > the stack trace, so I don't think the message needs to include the
> > > method.
> > > > 3) Beam is not the first Java project to have validation of
> > configuration
> > > > parameters of something or another, and I don't think I've seen
> > something
> > > > as verbose as style 2 used anywhere else in my experience of writing
> > > Java.
> > > >
> > > > What do people think about changing the guidance in favor of style 1?
> > > >
> > > > Specifically change the following example:
> > > >
> > > > public Twiddle withMoo(int moo) {
> > > >   checkArgument(moo >= 0 && moo < 100,
> > > >   "Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
> > > >   + "Valid values are 0 (inclusive) to 100 (exclusive)",
> > > >   moo);
> > > >   return 

Re: Requiring PTransform to set a coder on its resulting collections

2017-08-10 Thread Eugene Kirpichov
I've updated the guidance in PTransform Style Guide on setting coders
https://beam.apache.org/contribute/ptransform-style-guide/#coders
according to this discussion.
https://github.com/apache/beam-site/pull/279

On Thu, Aug 3, 2017 at 6:27 PM Robert Bradshaw 
wrote:

> On Thu, Aug 3, 2017 at 6:08 PM, Eugene Kirpichov
>  wrote:
> > https://github.com/apache/beam/pull/3649 has landed. The main
> contribution
> > of this PR is deprecating PTransform.getDefaultOutputCoder().
> >
> > Next steps are to get rid of all setCoder() calls in the SDK, and
> deprecate
> > setCoder().
> > Nearly all setCoder() calls (perhaps simply all?) I found are on the
> output
> > of mapping transforms, such as ParDo, Map/FlatMapElements, WithKeys.
> > I think we should simply make these transforms optionally configurable
> with
> > an output coder: e.g. input.apply(ParDo.of(new
> > SomeFn<>()).withOutputCoder(SomeCoder.of()))
> > For multi-output ParDo this is a little more complex API-wise, but doable
> > too.
> >
> > (another minor next step is to say in PTransform Style Guide that the
> > transform must set a coder on all its outputs)
> >
> > Sounds reasonable?
>
> +1
>
> I'd like to do this in a way that lowers the burden for all PTransform
> authors. Can't think of a better way than a special subclass of
> PTransform that has the setters that one would subclass...
>
> > On Thu, Aug 3, 2017 at 5:34 AM Lukasz Cwik 
> wrote:
> >
> >> I'm for (1) and am not sure about the feasibility of (2) without having
> an
> >> escape hatch that allows a pipeline author to specify a coder to handle
> >> their special case.
> >>
> >> On Tue, Aug 1, 2017 at 2:15 PM, Reuven Lax 
> >> wrote:
> >>
> >> > One interesting wrinkle: I'm about to propose a set of semantics for
> >> > snapshotting/in-place updating pipelines. Part of this proposal is the
> >> > ability to write pipelines to "upgrade" snapshots to make them
> compatible
> >> > with new graphs. This relies on the ability to have two separate
> coders
> >> for
> >> > the same type - the old coder and the new coder - in order to handle
> the
> >> > case where the user has changed coders in the new pipeline.
> >> >
> >> > On Tue, Aug 1, 2017 at 2:12 PM, Robert Bradshaw
> >> >  >> > > wrote:
> >> >
> >> > > There are two concerns in this thread:
> >> > >
> >> > > (1) Getting rid of PCollection.setCoder(). Everyone seems in favor
> of
> >> > this
> >> > > (right?)
> >> > >
> >> > > (2) Deprecating specifying Coders in favor of specifying
> >> TypeDescriptors.
> >> > > I'm generally in favor, but it's unclear how far we can push this
> >> > through.
> >> > >
> >> > > Let's at least do (1), and separately state a preference for (2),
> >> seeing
> >> > > how fare we can push it.
> >> > >
> >> > > On Thu, Jul 27, 2017 at 9:13 PM, Kenneth Knowles
> >>  >> > >
> >> > > wrote:
> >> > >
> >> > > > Another thought on this: setting a custom coder to support a
> special
> >> > data
> >> > > > distribution is likely often a property of the input to the
> pipeline.
> >> > So
> >> > > > setting a coder during pipeline construction - more generally,
> when
> >> > > writing
> >> > > > a composite transform for reuse - you might not actually have the
> >> > needed
> >> > > > information. But setting up a special indicator type descriptor
> lets
> >> > your
> >> > > > users map that type descriptor to a coder that works well for
> their
> >> > data.
> >> > > >
> >> > > > But Robert's example of RawUnionValue seems like a deal breaker
> for
> >> all
> >> > > > approaches. It really requires .getCoder() during expand() and
> >> > explicitly
> >> > > > building coders encoding information that is cumbersome to get
> into a
> >> > > > TypeDescriptor. While making up new type languages is a
> comfortable
> >> > > > activity for me :-) I don't think we should head down that path,
> for
> >> > our
> >> > > > users' sake. So I'll stop hoping we can eliminate this pain point
> for
> >> > > now.
> >> > > >
> >> > > > Kenn
> >> > > >
> >> > > > On Thu, Jul 27, 2017 at 8:48 PM, Kenneth Knowles 
> >> > wrote:
> >> > > >
> >> > > > > On Thu, Jul 27, 2017 at 11:18 AM, Thomas Groh
> >> >  >> > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > >> introduce a
> >> > > > >> new, specialized type to represent the restricted
> >> > > > >> (alternatively-distributed?) data. The TypeDescriptor for this
> >> type
> >> > > can
> >> > > > >> map
> >> > > > >> to the specialized coder, without having to perform a
> significant
> >> > > degree
> >> > > > >> of
> >> > > > >> potentially wasted encoding work, plus it includes the
> assumptions
> >> > > that
> >> > > > >> are
> >> > > > >> being made about the distribution of data.
> >> > > > >>
> >> > > > >
> >> > > > > This is a very cool idea, in theory :-)
> >> > > > >
> >> > > > > 

Re: beam-site issues with Jenkins and MergeBot

2017-08-10 Thread Jason Kuster
Investigating mergebot outage currently. Apologies for the downtime.

On Wed, Aug 9, 2017 at 9:55 PM, Eugene Kirpichov 
wrote:

> Indeed beam-site is at https://gitbox.apache.org/repos/asf/beam-site.git
>  now.
>
> However, Mergebot appears to still be not working.
> https://github.com/apache/beam-site/pull/283 fixes the dead link and it
> passes the Jenkins precommit tests, but my "@asfgit merge" appears to have
> done nothing. I'm gonna have to merge things manually for now.
>
> +Jason Kuster  Any ideas on why Mergebot is not
> working?
>
> On Wed, Aug 9, 2017 at 9:40 PM Jean-Baptiste Onofré 
> wrote:
>
>> Beam site is no more on git-wip-us, but it moved to gitbox afair.
>>
>> Regards
>> JB
>>
>> On 08/09/2017 10:08 PM, Eugene Kirpichov wrote:
>> > Hello,
>> >
>> > I've been trying to merge a PR https://github.com/apache/
>> beam-site/pull/278
>> > and ran into the following issues:
>> >
>> > 1) When I do "git fetch --all" on beam-site, I get an error "fatal:
>> > repository 'https://git-wip-us.apache.org/repos/asf/beam-site.git/' not
>> > found". Has the git address of the apache repo changed? Is it no longer
>> > valid because we have MergeBot?
>> >
>> > 2) Precommit tests are failing nearly 100% of the time.
>> > If you look at build history on
>> > https://builds.apache.org/job/beam_PreCommit_Website_Test/ - 9 out of
>> 10
>> > last builds failed.
>> > Failures I saw:
>> >
>> > 7 times:
>> > + gpg --keyserver hkp://keys.gnupg.net --recv-keys
>> > 409B6B1796C275462A1703113804BB82D39DC0E3
>> > gpg: requesting key D39DC0E3 from hkp server keys.gnupg.net
>> > ?: keys.gnupg.net: Cannot assign requested address
>> >
>> > 2 times:
>> > - ./content/subdir/contribute/testing/index.html
>> >*  External link https://builds.apache.org/view/Beam/ failed: 404
>> No error
>> >
>> > The second failure seems legit - https://builds.apache.org/view/Beam/
>> is
>> > actually 404 right now (I'll send a separate email about htis)
>> >
>> > The gnupg failure is not legit - I'm able to run the same command myself
>> > with no issues.
>> >
>> > 3) Suppose because of this, I'm not able to merge my PR with "@asfgit
>> > merge" command - I suppose it requires a successful test run. Would be
>> nice
>> > if it posted a comment saying why it refuses to merge.
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>


-- 
---
Jason Kuster
Apache Beam / Google Cloud Dataflow


Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Reuven Lax
On Thu, Aug 10, 2017 at 11:18 AM, Thomas Groh 
wrote:

> I think it must imply fixed content s - making a decision based
> on the contents of an iterable assuming the Iterable is deterministic seems
> an acceptable use of the API, and that requires the contents to be
> identical through failures. This does imply that (assuming this is reading
> directly from the output of a GroupByKey) the elements will be
> materialized, grouped, materialized again, and then read back to ensure
> that elements are not added on to the end.
>

I think this is doable, but a bit more involved to implement. We could
expose a list-state primitive for the runner to use (as opposed to the bag
state that is used today). Then before we deliver the Iterable, we would
add and checkpoint a boundary marker to the list. This will allow us to
ensure that redeliveries always use the same Iterable, and would not
require us to materialize the entire Iterable. This sounds harder to
implement than the other uses though.


> I agree that there is no ordering guarantee for the sequence in which
> elements are processed.
>
> On Thu, Aug 10, 2017 at 11:03 AM, Reuven Lax 
> wrote:
>
> > It means that single element replay is stable.
> >
> > On Thu, Aug 10, 2017 at 10:56 AM, Raghu Angadi
>  > >
> > wrote:
> >
> > > Can we define what exactly is meant by deterministic/stable/replayable
> > > etc?
> > >
> > >- Does it imply a fixed order? If yes, it implies fixed order of
> > >processElement() invocations, right? Are there any qualifiers
> (within
> > a
> > >window+key etc)?
> > >
> >
> > No, no ordering guarantee.
> >
> >
> > >- Does it also imply fixed length and content for value iterators?
> > >
> >
> > Good point. With our current runner api, it does not. the KV > Iterable> has no good way of being deterministic if there is late
> data.
> > We could do so by forcing the Iterable to be materialized into a single
> > element, but that would also mean that the entire Iterable must fit in
> > memory (which at least the Dataflow runner does not require).
> >
> >
> > >- Some examples to clarify nuances would be very useful.
> > >
> > > State durability semantics for timers that Reuven proposed seem to be
> > > unrelated to stable input (at model level). It might be simpler to add
> > > these semantics first. A lot of deterministic side-effects issues can
> be
> > > handled by durable state in timers. One thing I like about timers
> > approach
> > > is that it makes the cost more transparent to the user since the state
> is
> > > explicitly stored.
> > >
> > >
> > > On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers
> > >  > > > wrote:
> > >
> > > > I think it only makes sense in places where a user might reasonable
> > > require
> > > > stable input to ensure idempotency of side-effects. It also only
> makes
> > > > sense in places where a runner could reasonably provide such a
> > guarantee.
> > > >
> > > > A given Combine is unlikely to have side effects so it is less likely
> > to
> > > > benefit from stability of the input. Further, the reason writing a
> > > Combine
> > > > is desirable is because its execution can be split up and moved to
> the
> > > > mapper-side (before the GroupByKey). But this division is inherently
> > > > non-deterministic, and so it seems unlikely to benefit from
> stability.
> > > And
> > > > many cases where I could see wanting side-effects would end up in
> > > > extractOutput, for which there is an easy (arguably better) solution
> --
> > > > have extractOutput return the accumulators and do the side-effects
> in a
> > > > DoFn afterwards.
> > > >
> > > > For composites, it is a bit trickier. I could see a case for
> supporting
> > > it
> > > > on composites, but it would need to make it very clear that it only
> > > > affected the input to the composite. If any of the operations within
> > the
> > > > composite were non-deterministic, then the outputs of that could be
> > > > unstable, leading to instability in later parts of the composite.
> > > Further,
> > > > it doesn't seem to offer much. The composite itself doesn't perform
> > > > side-effects, so there is no benefit to having the annotation there
> --
> > > > instead, we allow the annotation to be put where it is relevant and
> > > > important -- on the DoFn's that actually have side-effects that
> require
> > > > stability.
> > > >
> > > > On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax  >
> > > > wrote:
> > > >
> > > > > I don't think it really makes sense to to do this on Combine. And I
> > > agree
> > > > > with you, it doesn't make sense on composites either.
> > > > >
> > > > > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > > > Does requires-stable-input only apply to ParDo transforms?
> > > > > >
> > > > > > I 

Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Thomas Groh
I think it must imply fixed content s - making a decision based
on the contents of an iterable assuming the Iterable is deterministic seems
an acceptable use of the API, and that requires the contents to be
identical through failures. This does imply that (assuming this is reading
directly from the output of a GroupByKey) the elements will be
materialized, grouped, materialized again, and then read back to ensure
that elements are not added on to the end.

I agree that there is no ordering guarantee for the sequence in which
elements are processed.

On Thu, Aug 10, 2017 at 11:03 AM, Reuven Lax 
wrote:

> It means that single element replay is stable.
>
> On Thu, Aug 10, 2017 at 10:56 AM, Raghu Angadi  >
> wrote:
>
> > Can we define what exactly is meant by deterministic/stable/replayable
> > etc?
> >
> >- Does it imply a fixed order? If yes, it implies fixed order of
> >processElement() invocations, right? Are there any qualifiers (within
> a
> >window+key etc)?
> >
>
> No, no ordering guarantee.
>
>
> >- Does it also imply fixed length and content for value iterators?
> >
>
> Good point. With our current runner api, it does not. the KV Iterable> has no good way of being deterministic if there is late data.
> We could do so by forcing the Iterable to be materialized into a single
> element, but that would also mean that the entire Iterable must fit in
> memory (which at least the Dataflow runner does not require).
>
>
> >- Some examples to clarify nuances would be very useful.
> >
> > State durability semantics for timers that Reuven proposed seem to be
> > unrelated to stable input (at model level). It might be simpler to add
> > these semantics first. A lot of deterministic side-effects issues can be
> > handled by durable state in timers. One thing I like about timers
> approach
> > is that it makes the cost more transparent to the user since the state is
> > explicitly stored.
> >
> >
> > On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers
> >  > > wrote:
> >
> > > I think it only makes sense in places where a user might reasonable
> > require
> > > stable input to ensure idempotency of side-effects. It also only makes
> > > sense in places where a runner could reasonably provide such a
> guarantee.
> > >
> > > A given Combine is unlikely to have side effects so it is less likely
> to
> > > benefit from stability of the input. Further, the reason writing a
> > Combine
> > > is desirable is because its execution can be split up and moved to the
> > > mapper-side (before the GroupByKey). But this division is inherently
> > > non-deterministic, and so it seems unlikely to benefit from stability.
> > And
> > > many cases where I could see wanting side-effects would end up in
> > > extractOutput, for which there is an easy (arguably better) solution --
> > > have extractOutput return the accumulators and do the side-effects in a
> > > DoFn afterwards.
> > >
> > > For composites, it is a bit trickier. I could see a case for supporting
> > it
> > > on composites, but it would need to make it very clear that it only
> > > affected the input to the composite. If any of the operations within
> the
> > > composite were non-deterministic, then the outputs of that could be
> > > unstable, leading to instability in later parts of the composite.
> > Further,
> > > it doesn't seem to offer much. The composite itself doesn't perform
> > > side-effects, so there is no benefit to having the annotation there --
> > > instead, we allow the annotation to be put where it is relevant and
> > > important -- on the DoFn's that actually have side-effects that require
> > > stability.
> > >
> > > On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax 
> > > wrote:
> > >
> > > > I don't think it really makes sense to to do this on Combine. And I
> > agree
> > > > with you, it doesn't make sense on composites either.
> > > >
> > > > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner
> >  > > >
> > > > wrote:
> > > >
> > > > > Does requires-stable-input only apply to ParDo transforms?
> > > > >
> > > > > I don't think it would make sense to annotate to composite, because
> > > > > checkpointing should happen as close to the side-effecting
> operation
> > as
> > > > > possible, since upstream transforms within a composite could
> > introduce
> > > > > non-determinism. So it's the primitive transform that should own
> the
> > > > > requirement.
> > > > >
> > > > > Are there other primitives that should be annotated? 'Combine' is
> > > > > interesting because it optimized in Dataflow (and perhaps other
> > > runners)
> > > > to
> > > > > partially apply before a GroupByKey.
> > > > >
> > > > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > > > +1 to the annotation idea, and to having it on processTimer.
> > > 

Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Reuven Lax
It means that single element replay is stable.

On Thu, Aug 10, 2017 at 10:56 AM, Raghu Angadi 
wrote:

> Can we define what exactly is meant by deterministic/stable/replayable
> etc?
>
>- Does it imply a fixed order? If yes, it implies fixed order of
>processElement() invocations, right? Are there any qualifiers (within a
>window+key etc)?
>

No, no ordering guarantee.


>- Does it also imply fixed length and content for value iterators?
>

Good point. With our current runner api, it does not. the KV has no good way of being deterministic if there is late data.
We could do so by forcing the Iterable to be materialized into a single
element, but that would also mean that the entire Iterable must fit in
memory (which at least the Dataflow runner does not require).


>- Some examples to clarify nuances would be very useful.
>
> State durability semantics for timers that Reuven proposed seem to be
> unrelated to stable input (at model level). It might be simpler to add
> these semantics first. A lot of deterministic side-effects issues can be
> handled by durable state in timers. One thing I like about timers approach
> is that it makes the cost more transparent to the user since the state is
> explicitly stored.
>
>
> On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers
>  > wrote:
>
> > I think it only makes sense in places where a user might reasonable
> require
> > stable input to ensure idempotency of side-effects. It also only makes
> > sense in places where a runner could reasonably provide such a guarantee.
> >
> > A given Combine is unlikely to have side effects so it is less likely to
> > benefit from stability of the input. Further, the reason writing a
> Combine
> > is desirable is because its execution can be split up and moved to the
> > mapper-side (before the GroupByKey). But this division is inherently
> > non-deterministic, and so it seems unlikely to benefit from stability.
> And
> > many cases where I could see wanting side-effects would end up in
> > extractOutput, for which there is an easy (arguably better) solution --
> > have extractOutput return the accumulators and do the side-effects in a
> > DoFn afterwards.
> >
> > For composites, it is a bit trickier. I could see a case for supporting
> it
> > on composites, but it would need to make it very clear that it only
> > affected the input to the composite. If any of the operations within the
> > composite were non-deterministic, then the outputs of that could be
> > unstable, leading to instability in later parts of the composite.
> Further,
> > it doesn't seem to offer much. The composite itself doesn't perform
> > side-effects, so there is no benefit to having the annotation there --
> > instead, we allow the annotation to be put where it is relevant and
> > important -- on the DoFn's that actually have side-effects that require
> > stability.
> >
> > On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax 
> > wrote:
> >
> > > I don't think it really makes sense to to do this on Combine. And I
> agree
> > > with you, it doesn't make sense on composites either.
> > >
> > > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner
>  > >
> > > wrote:
> > >
> > > > Does requires-stable-input only apply to ParDo transforms?
> > > >
> > > > I don't think it would make sense to annotate to composite, because
> > > > checkpointing should happen as close to the side-effecting operation
> as
> > > > possible, since upstream transforms within a composite could
> introduce
> > > > non-determinism. So it's the primitive transform that should own the
> > > > requirement.
> > > >
> > > > Are there other primitives that should be annotated? 'Combine' is
> > > > interesting because it optimized in Dataflow (and perhaps other
> > runners)
> > > to
> > > > partially apply before a GroupByKey.
> > > >
> > > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau
> >  > > >
> > > > wrote:
> > > >
> > > > > +1 to the annotation idea, and to having it on processTimer.
> > > > >
> > > > > -Tyler
> > > > >
> > > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <
> > aljos...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > +1 to the annotation approach. I outlined how implementing this
> > would
> > > > > work
> > > > > > in the Flink runner in the Thread about the exactly-once Kafka
> > Sink.
> > > > > >
> > > > > > > On 9. Aug 2017, at 23:03, Reuven Lax  >
> > > > wrote:
> > > > > > >
> > > > > > > Yes - I don't think we should try and make any deterministic
> > > > guarantees
> > > > > > > about what is in a bundle. Stability guarantees are per element
> > > only.
> > > > > > >
> > > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh
> > >  > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> +1 to the annotation-on-ProcessElement approach.
> 

Re: Exactly-once Kafka sink

2017-08-10 Thread Raghu Angadi
On Thu, Aug 10, 2017 at 5:15 AM, Aljoscha Krettek 
wrote:

> Ah, also regarding your earlier mail: I didn't know if many people were
> using Kafka with Dataflow, thanks for that clarification! :-)
>
> Also, I don't think that the TwoPhaseCommit Sink of Flink would work in a
> Beam context, I was just posting that for reference.
>

Yep. It was pretty useful to understand Flink checkpoint interactions with
operators. Will look into more in future when we want to add exactly-once
support for KafkaIO with on Flink.


> Best,
> Aljoscha
> > On 10. Aug 2017, at 11:13, Aljoscha Krettek  wrote:
> >
> > @Raghu: Yes, exactly, that's what I thought about this morning,
> actually. These are the methods of an operator that are relevant to
> checkpointing:
> >
> > class FlinkOperator() {
> >  open();
> >  snapshotState():
> >  notifySnapshotComplete();
> >  initializeState();
> > }
> >
> > Input would be buffered in state, would be checkpointed in
> snapshotState() and processed when we receive a notification of a complete
> checkpoint (which is sent out once all operators have signaled that
> checkpointing is complete). In case of failure, we would be re-initialized
> with the buffered elements in initializeState() and could re-process them
> in open().
> >
> > This is somewhat expensive and leads to higher latency so we should only
> do it if the DoFn signals that it needs deterministic input.
> >
> > +Jingsong Who is working on something similar for the output produced in
> finishBundle().
> >
> >> On 9. Aug 2017, at 19:41, Raghu Angadi 
> wrote:
> >>
> >> Yep, an option to ensure replays see identical input would be pretty
> useful.
> >> It might be challenging on horizontally checkpointing runners like Flink
> >> (only way I see to buffer all the input in state and replay it after
> >> checkpoint).
> >>
> >> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax 
> >> wrote:
> >>
> >>> Please see Kenn's proposal. This is a generic thing that is lacking in
> the
> >>> Beam model, and only works today for specific runners. We should fix
> this
> >>> at the Beam level, but I don't think that should block your PR.
> >>>
> >>>
> >>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi
> 
> >>> wrote:
> >>>
>  There are quite a few customers using KafkaIO with Dataflow. All of
> them
>  are potential users of exactly-once sink. Dataflow Pubsub sink does
> not
>  support EOS yet. Even among those customers, I do expect fraction of
>  applications requiring EOS would be pretty small, that's why I don't
> >>> think
>  extra shuffles are too expensive in overall cost yet.
> 
>  It is also not clear how Flink's 2-phase commit sink function could be
> >>> used
>  in Beam's context. Beam could add some checkpoint semantics to
> state-API
> >>> so
>  that all the runners could support in platform specific way.
> 
>  Took a look at Flink PR, commented on a few issues I see in comments
> >>> there
>  : https://github.com/apache/flink/pull/4239. May be an extra shuffle
> or
>  storing all them messages in state can get over those.
> 
>  On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek  >
>  wrote:
> 
> > Yes, I think making this explicit would be good. Having a
> >>> transformation
> > that makes assumptions about how the runner implements certain things
> >>> is
> > not optimal. Also, I think that most people probably don't use Kafka
> >>> with
> > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
>  here).
> > This would mean that the intersection of "people who would benefit
> from
>  an
> > exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> >>> rather
> > small, and therefore not many people would benefit from such a
> >>> Transform.
> >
> > This is all just conjecture, of course.
> >
> > Best,
> > Aljoscha
> >
> >> On 8. Aug 2017, at 23:34, Reuven Lax 
> >>> wrote:
> >>
> >> I think the issue we're hitting is how to write this in Beam.
> >>
> >> Dataflow historically guaranteed checkpointing at every GBK (which
> >>> due
>  to
> >> the design of Dataflow's streaming shuffle was reasonably
> efficient).
>  In
> >> Beam we never formalized these semantics, leaving these syncs in a
> >>> gray
> >> area. I believe the Spark runner currently checkpoints the RDD on
> >>> every
> >> GBK, so these unwritten semantics currently work for Dataflow and
> for
> > Spark.
> >>
> >> We need someway to express this operation in Beam, whether it be via
> >>> an
> >> explicit Checkpoint() operation or via marking DoFns as having side
> >> effects, and having the runner automatically insert such a
> Checkpoint
>  in
> >> front of them. In Flink, this operation can 

Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Scott Wegner
Does requires-stable-input only apply to ParDo transforms?

I don't think it would make sense to annotate to composite, because
checkpointing should happen as close to the side-effecting operation as
possible, since upstream transforms within a composite could introduce
non-determinism. So it's the primitive transform that should own the
requirement.

Are there other primitives that should be annotated? 'Combine' is
interesting because it optimized in Dataflow (and perhaps other runners) to
partially apply before a GroupByKey.

On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau 
wrote:

> +1 to the annotation idea, and to having it on processTimer.
>
> -Tyler
>
> On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek 
> wrote:
>
> > +1 to the annotation approach. I outlined how implementing this would
> work
> > in the Flink runner in the Thread about the exactly-once Kafka Sink.
> >
> > > On 9. Aug 2017, at 23:03, Reuven Lax  wrote:
> > >
> > > Yes - I don't think we should try and make any deterministic guarantees
> > > about what is in a bundle. Stability guarantees are per element only.
> > >
> > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh 
> > > wrote:
> > >
> > >> +1 to the annotation-on-ProcessElement approach. ProcessElement is the
> > >> minimum implementation requirement of a DoFn, and should be where the
> > >> processing logic which depends on characteristics of the inputs lie.
> > It's a
> > >> good way of signalling the requirements of the Fn, and letting the
> > runner
> > >> decide.
> > >>
> > >> I have a minor concern that this may not work as expected for users
> that
> > >> try to batch remote calls in `FinishBundle` - we should make sure we
> > >> document that it is explicitly the input elements that will be
> replayed,
> > >> and bundles and other operational are still arbitrary.
> > >>
> > >>
> > >>
> > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax  >
> > >> wrote:
> > >>
> > >>> I think deterministic here means deterministically replayable. i.e.
> no
> > >>> matter how many times the element is retried, it will always be the
> > same.
> > >>>
> > >>> I think we should also allow specifying this on processTimer. This
> > would
> > >>> mean that any keyed state written in a previous processElement must
> be
> > >>> guaranteed durable before processTimer is called.
> > >>>
> > >>>
> > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers 
> > >>> wrote:
> > >>>
> >  I strongly agree with this proposal. I think moving away from "just
> > >>> insert
> >  a GroupByKey for one of the 3 different reasons you may want it"
> > >> towards
> >  APIs that allow code to express the requirements they have and the
> > >> runner
> >  to choose the best way to meet this is a major step forwards in
> terms
> > >> of
> >  portability.
> > 
> >  I think "deterministic" may be misleading. The actual contents of
> the
> >  collection aren't deterministic if upstream computations aren't. The
> >  property we really need is that once an input may have been observed
> > by
> > >>> the
> >  side-effecting code it will never be observed with a different
> value.
> > 
> >  I would propose something RequiresStableInput, to indicate that the
> > >> input
> >  must be stable as observed by the function. I could also see
> something
> >  hinting at the fact we don't recompute the input, such as
> >  RequiresMemoizedInput or RequiresNoRecomputation.
> > 
> >  -- Ben
> > 
> >  P.S For anyone interested other uses of GroupByKey that we may want
> to
> >  discuss APIs for would be preventing retry across steps (eg.,
> > >> preventing
> >  fusion) and redistributing inputs across workers.
> > 
> >  On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
>  > >>>
> >  wrote:
> > 
> > > This came up again, so I wanted to push it along by proposing a
> > >>> specific
> > > API for Java that could have a derived API in Python. I am writing
> > >> this
> > > quickly to get something out there, so I welcome suggestions for
> >  revision.
> > >
> > > Today a DoFn has a @ProcessElement annotated method with various
> >  automated
> > > parameters, but most fundamentally this:
> > >
> > > @ProcessElement
> > > public void process(ProcessContext ctx) {
> > >  ctx.element() // to access the current input element
> > >  ctx.output(something) // to write to default output collection
> > >  ctx.output(tag, something) // to write to other output collections
> > > }
> > >
> > > For some time, we have hoped to unpack the context - it is a
> > > backwards-compatibility pattern made obsolete by the new DoFn
> design.
> > >>> So
> > > instead it would look like this:
> > >
> > > @ProcessElement
> > > 

Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Tyler Akidau
+1 to the annotation idea, and to having it on processTimer.

-Tyler

On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek 
wrote:

> +1 to the annotation approach. I outlined how implementing this would work
> in the Flink runner in the Thread about the exactly-once Kafka Sink.
>
> > On 9. Aug 2017, at 23:03, Reuven Lax  wrote:
> >
> > Yes - I don't think we should try and make any deterministic guarantees
> > about what is in a bundle. Stability guarantees are per element only.
> >
> > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh 
> > wrote:
> >
> >> +1 to the annotation-on-ProcessElement approach. ProcessElement is the
> >> minimum implementation requirement of a DoFn, and should be where the
> >> processing logic which depends on characteristics of the inputs lie.
> It's a
> >> good way of signalling the requirements of the Fn, and letting the
> runner
> >> decide.
> >>
> >> I have a minor concern that this may not work as expected for users that
> >> try to batch remote calls in `FinishBundle` - we should make sure we
> >> document that it is explicitly the input elements that will be replayed,
> >> and bundles and other operational are still arbitrary.
> >>
> >>
> >>
> >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax 
> >> wrote:
> >>
> >>> I think deterministic here means deterministically replayable. i.e. no
> >>> matter how many times the element is retried, it will always be the
> same.
> >>>
> >>> I think we should also allow specifying this on processTimer. This
> would
> >>> mean that any keyed state written in a previous processElement must be
> >>> guaranteed durable before processTimer is called.
> >>>
> >>>
> >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers 
> >>> wrote:
> >>>
>  I strongly agree with this proposal. I think moving away from "just
> >>> insert
>  a GroupByKey for one of the 3 different reasons you may want it"
> >> towards
>  APIs that allow code to express the requirements they have and the
> >> runner
>  to choose the best way to meet this is a major step forwards in terms
> >> of
>  portability.
> 
>  I think "deterministic" may be misleading. The actual contents of the
>  collection aren't deterministic if upstream computations aren't. The
>  property we really need is that once an input may have been observed
> by
> >>> the
>  side-effecting code it will never be observed with a different value.
> 
>  I would propose something RequiresStableInput, to indicate that the
> >> input
>  must be stable as observed by the function. I could also see something
>  hinting at the fact we don't recompute the input, such as
>  RequiresMemoizedInput or RequiresNoRecomputation.
> 
>  -- Ben
> 
>  P.S For anyone interested other uses of GroupByKey that we may want to
>  discuss APIs for would be preventing retry across steps (eg.,
> >> preventing
>  fusion) and redistributing inputs across workers.
> 
>  On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles  >>>
>  wrote:
> 
> > This came up again, so I wanted to push it along by proposing a
> >>> specific
> > API for Java that could have a derived API in Python. I am writing
> >> this
> > quickly to get something out there, so I welcome suggestions for
>  revision.
> >
> > Today a DoFn has a @ProcessElement annotated method with various
>  automated
> > parameters, but most fundamentally this:
> >
> > @ProcessElement
> > public void process(ProcessContext ctx) {
> >  ctx.element() // to access the current input element
> >  ctx.output(something) // to write to default output collection
> >  ctx.output(tag, something) // to write to other output collections
> > }
> >
> > For some time, we have hoped to unpack the context - it is a
> > backwards-compatibility pattern made obsolete by the new DoFn design.
> >>> So
> > instead it would look like this:
> >
> > @ProcessElement
> > public void process(Element element, MainOutput mainOutput, ...) {
> >  element.get() // to access the current input element
> >  mainOutput.output(something) // to write to the default output
>  collection
> >  other.output(something) // to write to other output collection
> > }
> >
> > I've deliberately left out the undecided syntax for side outputs. But
> >>> it
> > would be nice for the tag to be built in to the parameter so it
> >> doesn't
> > have to be used when calling output().
> >
> > One way to enhance this to deterministic input would just be this:
> >
> > @ProcessElement
> > @RequiresDeterministicInput
> > public void process(Element element, MainOutput mainOutput, ...) {
> >  element.get() // to access the current input element
> >  mainOutput.output(something) // 

Re: streaming output in just one files

2017-08-10 Thread Reuven Lax
On Thu, Aug 10, 2017 at 8:29 AM, Reuven Lax  wrote:

> This is how the file sink has always worked in Beam. If no sharding is
> specified, then this means runner-determined sharding, and by default that
> is one file per bundle. If Flink has small bundles, then I suggest using
> the withNumShards method to explicitly pick the number of output shards.
>
> The Flink runner can detect that runner-determined sharding has been
> chosen, and override it with a specific number of shards. For example, the
> Dataflow streaming runner (which as you mentioned also has small bundles)
> detects this case and sets the number of out files shards based on the
> number of workers in the worker pool Here
> 
>  is
> the code that does this; it should be quite simple to do something similar
> for Flink, and then there will be no need for users to explicitly call
> withNumShards themselves.
>
> On Thu, Aug 10, 2017 at 3:09 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> I think Beam File Writing was recently changed to write one file per
>> bundle. Currently, with the Flink Streaming Runner every element is
>> considered to be one bundle, i.e. all bundles have size 1. This means that
>> we write one file per output element.
>>
>> @Reuven, could you please confirm? Also, won't this be a problem for
>> other Runners that can have very small bundle sizes? IIRC the Dataflow
>> Runner also has rather small bundles in streaming mode, I'm not sure,
>> though.
>>
>> Best,
>> Aljoscha
>>
>> On 9. Aug 2017, at 19:14, Claire Yuan  wrote:
>>
>> Hi Aljoscha,
>>   I used the same sink as in the example TfIdf, and set the
>> streamingOptions to be True, then I got one record per files. Here is the
>> function to write my output. I called it in my pipeline at main method.
>>   public static class WriteOut extends PTransform> String>>, PDone> {
>> private String output;
>> public WriteOut(String output) {
>>   this.output = output;
>> }
>> @Override
>> public PDone expand(PCollection> someInfos) {
>>   return outputInfos
>>   .apply("Format", ParDo.of(new DoFn,
>> String>() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>>   c.output(String.format("%s %s:\t%s",
>>   c.element().getKey().getName(),
>>   c.element().getKey().getNum(),
>>   c.element().getValue()
>>   ));
>> }
>>   }))
>>   .apply(TextIO.write()
>>   .to(output) /* name of the output files */
>>   .withSuffix(".csv"));
>> }
>>   }
>>
>> Claire
>>
>>
>> On Wednesday, August 9, 2017 2:21 AM, Aljoscha Krettek <
>> aljos...@apache.org> wrote:
>>
>>
>> Hi,
>>
>> I think Flink should not create one output file per record. Could you
>> maybe post a snipped or minimal code example that shows how you're setting
>> up your sinks?
>>
>> Best,
>> Aljoscha
>>
>> On 8. Aug 2017, at 19:08, Claire Yuan  wrote:
>>
>> Hi all,
>>   I am currently running some jobs coded in Beam in streaming mode on
>> Yarn session by Flink. My data sink was CSV files like the one in examples
>> of TfIdf. And I noticed that the output format for Beam is to produce one
>> file for every record, and also temp files for them. That would result in
>> my space used exceed maximum.
>>   I am not sure whether is the problem that I used the API incorrectly
>> but I am wondering if there any way I can put all those records into one
>> file, or keep updating in that file, or delete those tempt files by
>> windowing or triggering?
>>
>> Claire
>>
>>
>>
>>
>>
>>
>


Re: Exactly-once Kafka sink

2017-08-10 Thread Aljoscha Krettek
Ah, also regarding your earlier mail: I didn't know if many people were using 
Kafka with Dataflow, thanks for that clarification! :-)

Also, I don't think that the TwoPhaseCommit Sink of Flink would work in a Beam 
context, I was just posting that for reference.

Best,
Aljoscha
> On 10. Aug 2017, at 11:13, Aljoscha Krettek  wrote:
> 
> @Raghu: Yes, exactly, that's what I thought about this morning, actually. 
> These are the methods of an operator that are relevant to checkpointing:
> 
> class FlinkOperator() {
>  open();
>  snapshotState():
>  notifySnapshotComplete();
>  initializeState();
> }
> 
> Input would be buffered in state, would be checkpointed in snapshotState() 
> and processed when we receive a notification of a complete checkpoint (which 
> is sent out once all operators have signaled that checkpointing is complete). 
> In case of failure, we would be re-initialized with the buffered elements in 
> initializeState() and could re-process them in open().
> 
> This is somewhat expensive and leads to higher latency so we should only do 
> it if the DoFn signals that it needs deterministic input.
> 
> +Jingsong Who is working on something similar for the output produced in 
> finishBundle().
> 
>> On 9. Aug 2017, at 19:41, Raghu Angadi  wrote:
>> 
>> Yep, an option to ensure replays see identical input would be pretty useful.
>> It might be challenging on horizontally checkpointing runners like Flink
>> (only way I see to buffer all the input in state and replay it after
>> checkpoint).
>> 
>> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax 
>> wrote:
>> 
>>> Please see Kenn's proposal. This is a generic thing that is lacking in the
>>> Beam model, and only works today for specific runners. We should fix this
>>> at the Beam level, but I don't think that should block your PR.
>>> 
>>> 
>>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi 
>>> wrote:
>>> 
 There are quite a few customers using KafkaIO with Dataflow. All of them
 are potential users of exactly-once sink. Dataflow Pubsub sink does not
 support EOS yet. Even among those customers, I do expect fraction of
 applications requiring EOS would be pretty small, that's why I don't
>>> think
 extra shuffles are too expensive in overall cost yet.
 
 It is also not clear how Flink's 2-phase commit sink function could be
>>> used
 in Beam's context. Beam could add some checkpoint semantics to state-API
>>> so
 that all the runners could support in platform specific way.
 
 Took a look at Flink PR, commented on a few issues I see in comments
>>> there
 : https://github.com/apache/flink/pull/4239. May be an extra shuffle or
 storing all them messages in state can get over those.
 
 On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek 
 wrote:
 
> Yes, I think making this explicit would be good. Having a
>>> transformation
> that makes assumptions about how the runner implements certain things
>>> is
> not optimal. Also, I think that most people probably don't use Kafka
>>> with
> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
 here).
> This would mean that the intersection of "people who would benefit from
 an
> exactly-once Kafka sink" and "people who use Beam on Dataflow" is
>>> rather
> small, and therefore not many people would benefit from such a
>>> Transform.
> 
> This is all just conjecture, of course.
> 
> Best,
> Aljoscha
> 
>> On 8. Aug 2017, at 23:34, Reuven Lax 
>>> wrote:
>> 
>> I think the issue we're hitting is how to write this in Beam.
>> 
>> Dataflow historically guaranteed checkpointing at every GBK (which
>>> due
 to
>> the design of Dataflow's streaming shuffle was reasonably efficient).
 In
>> Beam we never formalized these semantics, leaving these syncs in a
>>> gray
>> area. I believe the Spark runner currently checkpoints the RDD on
>>> every
>> GBK, so these unwritten semantics currently work for Dataflow and for
> Spark.
>> 
>> We need someway to express this operation in Beam, whether it be via
>>> an
>> explicit Checkpoint() operation or via marking DoFns as having side
>> effects, and having the runner automatically insert such a Checkpoint
 in
>> front of them. In Flink, this operation can be implemented using what
>> Aljoscha posted.
>> 
>> Reuven
>> 
>> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
>>> aljos...@apache.org>
>> wrote:
>> 
>>> Hi,
>>> 
>>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
>>> for
> such
>>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> that:
>>> [2]
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> [1] 

Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Aljoscha Krettek
+1 to the annotation approach. I outlined how implementing this would work in 
the Flink runner in the Thread about the exactly-once Kafka Sink.

> On 9. Aug 2017, at 23:03, Reuven Lax  wrote:
> 
> Yes - I don't think we should try and make any deterministic guarantees
> about what is in a bundle. Stability guarantees are per element only.
> 
> On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh 
> wrote:
> 
>> +1 to the annotation-on-ProcessElement approach. ProcessElement is the
>> minimum implementation requirement of a DoFn, and should be where the
>> processing logic which depends on characteristics of the inputs lie. It's a
>> good way of signalling the requirements of the Fn, and letting the runner
>> decide.
>> 
>> I have a minor concern that this may not work as expected for users that
>> try to batch remote calls in `FinishBundle` - we should make sure we
>> document that it is explicitly the input elements that will be replayed,
>> and bundles and other operational are still arbitrary.
>> 
>> 
>> 
>> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax 
>> wrote:
>> 
>>> I think deterministic here means deterministically replayable. i.e. no
>>> matter how many times the element is retried, it will always be the same.
>>> 
>>> I think we should also allow specifying this on processTimer. This would
>>> mean that any keyed state written in a previous processElement must be
>>> guaranteed durable before processTimer is called.
>>> 
>>> 
>>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers 
>>> wrote:
>>> 
 I strongly agree with this proposal. I think moving away from "just
>>> insert
 a GroupByKey for one of the 3 different reasons you may want it"
>> towards
 APIs that allow code to express the requirements they have and the
>> runner
 to choose the best way to meet this is a major step forwards in terms
>> of
 portability.
 
 I think "deterministic" may be misleading. The actual contents of the
 collection aren't deterministic if upstream computations aren't. The
 property we really need is that once an input may have been observed by
>>> the
 side-effecting code it will never be observed with a different value.
 
 I would propose something RequiresStableInput, to indicate that the
>> input
 must be stable as observed by the function. I could also see something
 hinting at the fact we don't recompute the input, such as
 RequiresMemoizedInput or RequiresNoRecomputation.
 
 -- Ben
 
 P.S For anyone interested other uses of GroupByKey that we may want to
 discuss APIs for would be preventing retry across steps (eg.,
>> preventing
 fusion) and redistributing inputs across workers.
 
 On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles >> 
 wrote:
 
> This came up again, so I wanted to push it along by proposing a
>>> specific
> API for Java that could have a derived API in Python. I am writing
>> this
> quickly to get something out there, so I welcome suggestions for
 revision.
> 
> Today a DoFn has a @ProcessElement annotated method with various
 automated
> parameters, but most fundamentally this:
> 
> @ProcessElement
> public void process(ProcessContext ctx) {
>  ctx.element() // to access the current input element
>  ctx.output(something) // to write to default output collection
>  ctx.output(tag, something) // to write to other output collections
> }
> 
> For some time, we have hoped to unpack the context - it is a
> backwards-compatibility pattern made obsolete by the new DoFn design.
>>> So
> instead it would look like this:
> 
> @ProcessElement
> public void process(Element element, MainOutput mainOutput, ...) {
>  element.get() // to access the current input element
>  mainOutput.output(something) // to write to the default output
 collection
>  other.output(something) // to write to other output collection
> }
> 
> I've deliberately left out the undecided syntax for side outputs. But
>>> it
> would be nice for the tag to be built in to the parameter so it
>> doesn't
> have to be used when calling output().
> 
> One way to enhance this to deterministic input would just be this:
> 
> @ProcessElement
> @RequiresDeterministicInput
> public void process(Element element, MainOutput mainOutput, ...) {
>  element.get() // to access the current input element
>  mainOutput.output(something) // to write to the default output
 collection
>  other.output(something) // to write to other output collection
> }
> 
> There are really a lot of places where we could put an annotation or
 change
> a type to indicate that the input PCollection should be
> well-defined/deterministically-replayable. I don't have a really
>>> strong

Re: Exactly-once Kafka sink

2017-08-10 Thread Aljoscha Krettek
@Raghu: Yes, exactly, that's what I thought about this morning, actually. These 
are the methods of an operator that are relevant to checkpointing:

class FlinkOperator() {
  open();
  snapshotState():
  notifySnapshotComplete();
  initializeState();
}

Input would be buffered in state, would be checkpointed in snapshotState() and 
processed when we receive a notification of a complete checkpoint (which is 
sent out once all operators have signaled that checkpointing is complete). In 
case of failure, we would be re-initialized with the buffered elements in 
initializeState() and could re-process them in open().

This is somewhat expensive and leads to higher latency so we should only do it 
if the DoFn signals that it needs deterministic input.

+Jingsong Who is working on something similar for the output produced in 
finishBundle().

> On 9. Aug 2017, at 19:41, Raghu Angadi  wrote:
> 
> Yep, an option to ensure replays see identical input would be pretty useful.
> It might be challenging on horizontally checkpointing runners like Flink
> (only way I see to buffer all the input in state and replay it after
> checkpoint).
> 
> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax 
> wrote:
> 
>> Please see Kenn's proposal. This is a generic thing that is lacking in the
>> Beam model, and only works today for specific runners. We should fix this
>> at the Beam level, but I don't think that should block your PR.
>> 
>> 
>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi 
>> wrote:
>> 
>>> There are quite a few customers using KafkaIO with Dataflow. All of them
>>> are potential users of exactly-once sink. Dataflow Pubsub sink does not
>>> support EOS yet. Even among those customers, I do expect fraction of
>>> applications requiring EOS would be pretty small, that's why I don't
>> think
>>> extra shuffles are too expensive in overall cost yet.
>>> 
>>> It is also not clear how Flink's 2-phase commit sink function could be
>> used
>>> in Beam's context. Beam could add some checkpoint semantics to state-API
>> so
>>> that all the runners could support in platform specific way.
>>> 
>>> Took a look at Flink PR, commented on a few issues I see in comments
>> there
>>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle or
>>> storing all them messages in state can get over those.
>>> 
>>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek 
>>> wrote:
>>> 
 Yes, I think making this explicit would be good. Having a
>> transformation
 that makes assumptions about how the runner implements certain things
>> is
 not optimal. Also, I think that most people probably don't use Kafka
>> with
 the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
>>> here).
 This would mean that the intersection of "people who would benefit from
>>> an
 exactly-once Kafka sink" and "people who use Beam on Dataflow" is
>> rather
 small, and therefore not many people would benefit from such a
>> Transform.
 
 This is all just conjecture, of course.
 
 Best,
 Aljoscha
 
> On 8. Aug 2017, at 23:34, Reuven Lax 
>> wrote:
> 
> I think the issue we're hitting is how to write this in Beam.
> 
> Dataflow historically guaranteed checkpointing at every GBK (which
>> due
>>> to
> the design of Dataflow's streaming shuffle was reasonably efficient).
>>> In
> Beam we never formalized these semantics, leaving these syncs in a
>> gray
> area. I believe the Spark runner currently checkpoints the RDD on
>> every
> GBK, so these unwritten semantics currently work for Dataflow and for
 Spark.
> 
> We need someway to express this operation in Beam, whether it be via
>> an
> explicit Checkpoint() operation or via marking DoFns as having side
> effects, and having the runner automatically insert such a Checkpoint
>>> in
> front of them. In Flink, this operation can be implemented using what
> Aljoscha posted.
> 
> Reuven
> 
> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
>> aljos...@apache.org>
> wrote:
> 
>> Hi,
>> 
>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
>> for
 such
>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
 that:
>> [2]
>> 
>> Best,
>> Aljoscha
>> 
>> [1] https://github.com/apache/flink/blob/
>>> 62e99918a45b7215c099fbcf160d45
>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
>>> java#L55
 <
>> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
>>> java#L55>
>> [2] https://github.com/apache/flink/pull/4239
>>> On 3. 

Jenkins build is back to normal : beam_Release_NightlySnapshot #499

2017-08-10 Thread Apache Jenkins Server
See