Re: [ANNOUNCEMENT] New committers, May 2018 edition!

2018-06-01 Thread Thomas Groh
Congrats, you three!

On Thu, May 31, 2018 at 7:09 PM Davor Bonaci  wrote:

> Please join me and the rest of Beam PMC in welcoming the following
> contributors as our newest committers. They have significantly contributed
> to the project in different ways, and we look forward to many more
> contributions in the future.
>
> * Griselda Cuevas
> * Pablo Estrada
> * Jason Kuster
>
> (Apologizes for a delayed announcement, and the lack of the usual
> paragraph summarizing individual contributions.)
>
> Congratulations to all three! Welcome!
>


[VOTE] Code Review Process

2018-06-01 Thread Thomas Groh
As we seem to largely have consensus in "Reducing Committer Load for Code
Reviews"[1], this is a vote to change the Beam policy on Code Reviews to
require that

(1) At least one committer is involved with the code review, as either a
reviewer or as the author
(2) A contributor has approved the change

prior to merging any change.

This changes our policy from its current requirement that at least one
committer *who is not the author* has approved the change prior to merging.
We believe that changing this process will improve code review throughput,
reduce committer load, and engage more of the community in the code review
process.

Please vote:
[ ] +1: Accept the above proposal to change the Beam code review/merge
policy
[ ] -1: Leave the Code Review policy unchanged

Thanks,

Thomas

[1]
https://lists.apache.org/thread.html/7c1fde3884fbefacc252b6d4b434f9a9c2cf024f381654aa3e47df18@%3Cdev.beam.apache.org%3E


Reducing Committer Load for Code Reviews

2018-05-30 Thread Thomas Groh
Hey all;

I've been thinking recently about the process we have for committing code,
and our current process. I'd like to propose that we change our current
process to require at least one committer is present for each code review,
but remove the need to have a second committer review the code prior to
submission if the original contributor is a committer.

Generally, if we trust someone with the ability to merge code that someone
else has written, I think it's sensible to also trust them to choose a
capable reviewer. We expect that all of the people that we have recognized
as committers will maintain the project's quality bar - and that's true for
both code they author and code they review. Given that, I think it's
sensible to expect a committer will choose a reviewer who is versed in the
component they are contributing to who can provide insight and will also
hold up the quality bar.

Making this change will help spread the review load out among regular
contributors to the project, and reduce bottlenecks caused by committers
who have few other committers working on their same component. Obviously,
this requires that committers act with the best interests of the project
when they send out their code for reviews - but this is the behavior we
demand before someone is recognized as a committer, so I don't see why that
would be cause for concern.

Yours,

Thomas


Re: [VOTE] Go SDK

2018-05-23 Thread Thomas Groh
+1!

I, for one, could not be more excited about our glorious portable future.

On Mon, May 21, 2018 at 6:03 PM Henning Rohde  wrote:

> Hi everyone,
>
> Now that the remaining issues have been resolved as discussed, I'd like to
> propose a formal vote on accepting the Go SDK into master. The main
> practical difference is that the Go SDK would be part of the Apache Beam
> release going forward.
>
> Highlights of the Go SDK:
>  * Go user experience with natively-typed DoFns with (simulated) generic
> types
>  * Covers most of the Beam model: ParDo, GBK, CoGBK, Flatten, Combine,
> Windowing, ..
>  * Includes several IO connectors: Datastore, BigQuery, PubSub,
> extensible textio.
>  * Supports the portability framework for both batch and streaming,
> notably the upcoming portable Flink runner
>  * Supports a direct runner for small batch workloads and testing.
>  * Includes pre-commit tests and post-commit integration tests.
>
> And last but not least
>  *  includes contributions from several independent users and developers,
> notably an IO connector for Datastore!
>
> Website: https://beam.apache.org/documentation/sdks/go/
> Code: https://github.com/apache/beam/tree/master/sdks/go
> Design: https://s.apache.org/beam-go-sdk-design-rfc
>
> Please vote:
> [ ] +1, Approve that the Go SDK becomes an official part of Beam
> [ ] -1, Do not approve (please provide specific comments)
>
> Thanks,
>  The Gophers of Apache Beam
>
>
>


Re: org.apache.beam.sdk.values.TupleTag#genId and stacktraces?

2018-04-10 Thread Thomas Groh
It may be reasonable to port most of those TupleTags to have an explicit,
rather than generated ID, which will remove the need to inspect the stack
trace.

However, as mentioned, the constructor shouldn't provide an unstable ID, as
otherwise most pipelines won't work on production runners.

On Tue, Apr 10, 2018 at 10:09 AM Romain Manni-Bucau 
wrote:

> Oops cross post sorry.
>
> Issue i hit on this thread is it is used a lot in tests abd it slows down
> tests for nothing like with generatesequence ones
>
> Le 10 avr. 2018 19:00, "Romain Manni-Bucau"  a
> écrit :
>
>>
>>
>> Le 10 avr. 2018 18:40, "Robert Bradshaw"  a écrit :
>>
>> These values should be, inasmuch as possible, stable across VMs. How slow
>> is slow? Doesn't this happen only once per VM startup?
>>
>>
>> Once per jvm and idea launches a jvm per test and the daemon does save
>> enough time, you still go through the whole project and check all upstream
>> deps it seems.
>>
>> It is <1s with maven vs 5-6s with gradle.
>>
>>
>> On Tue, Apr 10, 2018 at 9:33 AM Romain Manni-Bucau 
>> wrote:
>>
>>> Hi
>>>
>>> does org.apache.beam.sdk.values.TupleTag#genId need to get the
>>> stacktrace or can we use any id generator (like
>>> UUID.random().toString())? Using traces is quite slow under load and
>>> environments where the root stack is not just the "next" level so
>>> skipping it would be nice.
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>>>
>>
>>


Re: org.apache.beam.sdk.values.TupleTag#genId and stacktraces?

2018-04-10 Thread Thomas Groh
In fact, this is explicitly to work with `static final` TupleTags, and
using a non-stable isn't feasible.

A static final TupleTag won't be serialized in the closure of an object
that uses it - it will be instantiated independently in any other
ClassLoader, such as on a remote JVM. If you use a constant TupleTag during
pipeline construction, and again during runtime, they must have matching
identifiers, or the system can't correlate the two objects. Use of
something like UUID.random() would remove our ability to use any constant
values.

On Tue, Apr 10, 2018 at 9:40 AM Robert Bradshaw  wrote:

> These values should be, inasmuch as possible, stable across VMs. How slow
> is slow? Doesn't this happen only once per VM startup?
>
> On Tue, Apr 10, 2018 at 9:33 AM Romain Manni-Bucau 
> wrote:
>
>> Hi
>>
>> does org.apache.beam.sdk.values.TupleTag#genId need to get the
>> stacktrace or can we use any id generator (like
>> UUID.random().toString())? Using traces is quite slow under load and
>> environments where the root stack is not just the "next" level so
>> skipping it would be nice.
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>>
>


Re: NoSuchElementException in reader.getCurrent*.

2018-03-13 Thread Thomas Groh
I'm not sure what you mean, JB. getCurrent* is source-implementor visible
(and must be), and users don't need to interact with it directly via
Read/other IO transforms.

WIth regard to Eugene's point - I still am strongly in favor of telling a
source that they at least *SHOULD* throw an exception if they are called
when no element is available, and it seems reasonable to me to include that
on the documentation for the Reader


On Tue, Mar 13, 2018 at 4:53 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> I agree. I don't think it's useful to expose getCurrent to the user.
> That's more runner related.
>
> Regards
> JB
> Le 12 mars 2018, à 11:06, Romain Manni-Bucau <rmannibu...@gmail.com> a
> écrit:
>>
>> I agree Thomas but I kind of read it as "yes we can drop that
>> constraint". If not we should also check we are used in a thread safe
>> context etc which will likely never hit the user sdk API so why doing that
>> case a particular case? Am I missing something?
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |   Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> |  Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>> 2018-03-12 17:04 GMT+01:00 Thomas Groh <tg...@google.com>:
>>
>>> If a call to `getCurrentWhatever` happens after `start` or `advance` has
>>> returned false, it's a bug in the runner, but the reader needs to be able
>>> to fail, otherwise you'll get a synthetic element that doesn't really
>>> exist. If a reader throws `NoSuchElementException` after the most recent
>>> call returned true, the reader isn't conforming to spec.
>>>
>>>
>>> On Mon, Mar 12, 2018 at 9:00 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> why reader#getCurrent* can throw NoSuchElementException,
>>>> my understanding is that the runner will guarantee that start or
>>>> advance was called and returned true when calling getCurrent so this is a
>>>> case which shouldn't happen, no?
>>>>
>>>> Romain Manni-Bucau
>>>> @rmannibucau <https://twitter.com/rmannibucau> |   Blog
>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>> <http://rmannibucau.wordpress.com> |  Github
>>>> <https://github.com/rmannibucau> | LinkedIn
>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>
>>>
>>


Re: Flatten input data streams with skewed watermark progress

2018-03-12 Thread Thomas Groh
That one would be, for example, having a PCollection with a highly advanced
watermark and a PCollection with a much earlier watermark, and have an
input that is behind the watermark of the former PCollection go through the
flatten - at which point it moves to being ahead of the watermark.

That's fine, because one of two things happens in practice:
* Either the upstream contains a GroupByKey, in which the element will be
dropped if the window is expired
* Or, the upstream does not contain a GroupByKey, which means the element
never appeared at such a grouping behind the watermark, its final window
was never expired before that element arrived at the first downstream
GroupByKey.

Specifically we're concerned about GroupByKeys because that's the point at
which we become certain of the window the element is within, and if that
window is expired; before that point, we can't claim with certainty on the
final window the element will be assigned to.


On Mon, Mar 12, 2018 at 3:47 PM Reuven Lax  wrote:

> Logically a Flatten is just a way to create a multi-input transform
> downstream of the flatten (you can imagine a model in which Flatten was not
> explicit, we just allowed multiple main inputs). This means that yes, the
> watermark is the minimum of all inputs.
>
> I don't see how a late tuple can become early. Can you explain?
>
>
> On Mon, Mar 12, 2018 at 2:07 PM Shen Li  wrote:
>
>> Hi Reuven,
>>
>> What about watermark? Should Flatten emit the min watermark of all input
>> data streams? If that is the case, one late tuple can become early after
>> Flatten, right? Will that cause any problem?
>>
>> Shen
>>
>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax  wrote:
>>
>>> No, I don't think it makes sense for the Flatten operator to cache
>>> element.
>>>
>>>
>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li  wrote:
>>>
 If multiple inputs of Flatten proceed at different speeds, should the
 Flatten operator cache tuples before emitting output watermarks? This can
 prevent a late tuple from becoming early. But if the watermark gap (i.e.,
 cache size) becomes too large among inputs, can the application tell
 Beam/runner to emit output watermark anyway and consider slow input tuples
 as late?

 Thanks,
 Shen

>>>
>>


Re: NoSuchElementException in reader.getCurrent*.

2018-03-12 Thread Thomas Groh
The correct sequencing and respecting return values of `start` and
`advance` is a precondition, and `NoSuchElementException` is the failure
mode if the precondition isn't met. Documenting the behavior in the case of
precondition failures is entirely reasonable. For example, look at Java's
`Iterator` documentation, which has basically the same text:
https://docs.oracle.com/javase/8/docs/api/java/util/Iterator.html


On Mon, Mar 12, 2018 at 9:20 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> I agree Thomas but I kind of read it as "yes we can drop that constraint".
> If not we should also check we are used in a thread safe context etc which
> will likely never hit the user sdk API so why doing that case a particular
> case? Am I missing something?
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-12 17:04 GMT+01:00 Thomas Groh <tg...@google.com>:
>
>> If a call to `getCurrentWhatever` happens after `start` or `advance` has
>> returned false, it's a bug in the runner, but the reader needs to be able
>> to fail, otherwise you'll get a synthetic element that doesn't really
>> exist. If a reader throws `NoSuchElementException` after the most recent
>> call returned true, the reader isn't conforming to spec.
>>
>>
>> On Mon, Mar 12, 2018 at 9:00 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>
>>> Hi guys,
>>>
>>> why reader#getCurrent* can throw NoSuchElementException,
>>> my understanding is that the runner will guarantee that start or advance
>>> was called and returned true when calling getCurrent so this is a case
>>> which shouldn't happen, no?
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>
>>
>


Re: NoSuchElementException in reader.getCurrent*.

2018-03-12 Thread Thomas Groh
If a call to `getCurrentWhatever` happens after `start` or `advance` has
returned false, it's a bug in the runner, but the reader needs to be able
to fail, otherwise you'll get a synthetic element that doesn't really
exist. If a reader throws `NoSuchElementException` after the most recent
call returned true, the reader isn't conforming to spec.


On Mon, Mar 12, 2018 at 9:00 AM Romain Manni-Bucau 
wrote:

> Hi guys,
>
> why reader#getCurrent* can throw NoSuchElementException,
> my understanding is that the runner will guarantee that start or advance
> was called and returned true when calling getCurrent so this is a case
> which shouldn't happen, no?
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>


Re: to a modular embedded java runner to replace the direct runner?

2018-03-05 Thread Thomas Groh
The portable java 'DirectRunner' is already in-progress, and has been for
several months - it's tracked by
https://issues.apache.org/jira/browse/BEAM-2899

My expectation is that the actual portability augmentations is unlikely to
require significant changes to the DirectRunner implementations. I'd prefer
to avoid any major refactors while that effort is underway - it's likely to
add a significant amount of overhead, and I don't think that this
refactoring will improve the velocity for the portability changes. The
non-checking modes (immutability, enforceability) can be for the time
disabled with flags.

After the portability runner goes in, I'm not opposed to considering a
refactoring - but I think that splitting "Model Enforcements" into separate
modules might be overkill for things of that scope.


On Mon, Mar 5, 2018 at 10:25 AM Romain Manni-Bucau 
wrote:

> Hi Lukasz,
>
> concretely it is pretty simple - if not let me know, i'll try to gist some
> code but I don't think we need:
>
> (I'll use module names, let's not discuss them, it is just to share the
> idea) I see it as follow:
>
> 1. beam-java-runner - bare API impl (extracted from direct runner, this is
> not a new impl. Advantage is to make the new portable java runner and
> direct runner converging)
> 2. beam-java-runner-immutability-extension: adds the option
> EnforceImmutability
> 3. beam-java-runner-encodability: adds the option EnforceEncodability
> 4. beam-java-runner-portableapi: adds ProtoTranslation (+ a few other
> parts probably), this one will lead more or less to the portable one
> 5. beam-java-direct-runner (current one)
>
> Idea is to have a *unique* and production proof embedded java runner which
> has composable extensions and the full blown flavor (with all extensions)
> is the direct runner, an intermediate flavor is the portable runner.
> Advantage is to be able to keep adding validations and harnessing to the
> direct runner without degrading all the other use cases.
> This lead to keep a light embedded runner as a beam reference
> implementation which is usable in prod until the volumes require more.
>
> If we don't go that way we should think about what is the reference
> implementation and maybe just drop some usages of the direct runner and
> enhance another runner supporting embedded runs to support all the beam API
> (for instance flink runner).
>
> Does it make it clearer?
>
>
>
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>
> 2018-03-04 20:15 GMT+01:00 Lukasz Cwik :
>
>> Feel free to document what you would like the extension mechanism to do
>> and provide some skeleton interfaces for APIs that you would like to
>> support.
>>
>> On Fri, Mar 2, 2018 at 2:33 PM, Romain Manni-Bucau > > wrote:
>>
>>>
>>>
>>> Le 2 mars 2018 22:22, "Lukasz Cwik"  a écrit :
>>>
>>> To my knowledge, no one has discussed an extension mechanism for the
>>> direct runner but the difficulty is in how to get extensions to interact
>>> with the internals of the direct runner cleanly.
>>> Note that the direct runner currently accepts a set of flags which
>>> enable/disable validation and control how it runs like
>>> "--enforceImmutability":
>>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L49
>>> Would it be easier to just add more flags which control how the direct
>>> runner works?
>>>
>>>
>>> Nop, idea is to guarantee a behavior and prevent regression whatever is
>>> added for other purposes
>>>
>>>
>>>
>>> As for having a direct runner using portability to be able to execute
>>> Python / Go / Java SDKs, you should look at
>>> https://issues.apache.org/jira/browse/BEAM-2899
>>>
>>> On Fri, Mar 2, 2018 at 12:53 PM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
 Hi guys,

 wonder if you discussed or thought to break down what is called today
 the direct runner in an embedded runner which would be modular an
 extensible.

 What I have in mind is the following:

 1. have a strong embedded runner implementing the whole beam API but
 limited to a single JVM
 2. keep a string test oriented runner (what we call direct runner today)

 The overall design would be to ensure 1 and 2 share the common code and
 avoid to do yet another runner. This means several extension points should
 be defined to:

 1. add the serialization validation
 2. add the portability validation
 3. add the execution randomization

 I didn't think yet to what would be the execution points (can just be
 replacements 

Re: @TearDown guarantees

2018-02-16 Thread Thomas Groh
On perf: Deserialization of an arbitrary object is expensive. This cost is
amortized over all of the elements that the object processes, but for a
runner with small bundles, that cost never gets meaningfully amortized -
deserializing a DoFn instance of unknown complexity to process one element
means that we're multiplying our decoding costs by potentially multiple
times. Reusing user Fns permits us to amortize across worker lifetimes,
which is many-times beneficial.

On resilience: You should distinguish "reliably" with "always". Users can
depend on "always" for correctness, but can't depend on something done
"reliably" for that. "Reliably" can be generally depended on for
performance, which is why Teardown exists.

Runners can call the Teardown method *almost always *if a DoFn instance
will not be reused (in the absence of arbitrary failures, the runner
*MUST* call
Teardown, according to the original spec), but *SHOULD* and *MUST* are
extremely different in terms of implementation requirements. If you say
that Teardown *MUST* be called, what this means is that a runner *MUST
NOT* have
resources that fail arbitrarily, and this is not an acceptable restriction
for any existing distributed backend.

If you need something which is always called once per element at a
granularity coarser than once per element, that is exactly what
FinishBundle provides, and is why the method exists.

The original proposal for Setup/Teardown:
https://docs.google.com/document/d/1LLQqggSePURt3XavKBGV7SZJYQ4NW8yCu63lBchzMRk/edit#

On Fri, Feb 16, 2018 at 9:39 AM Romain Manni-Bucau 
wrote:

> So do I get it right a leak of Dataflow implementation impacts the API?
> Also sounds like this perf issues is due to a blind serialization instead
> of modelizing what is serialized - nothing should be slow enough in the
> serialization at that level, do you have more details on that particular
> point?
>
It also means you accept to leak particular instances data like password
> etc (all the @AutoValue builder ones typically) since you dont call - or
> not reliably - a postexecution hook which should get solved ASAP.
>

> @Thomas: I understand your update was to align the dataflow behavior to
> the API but actually the opposite should be done, align DataFlow impl on
> the API. If we disagree tearDown is [1;1] - I'm fine with that, then
> teardown is not really usable for users and we miss a
> "the fact that we leave the runner discretion on when it can call
> teardown does not make this poorly-defined; it means that users should not
> depend on teardown being called for correct behavior, and *this has
> always been true and will continue to be true*."
> This is not really the case, you say it yourself "[...] does not make
> this poorly-defined [...] it means that users should not depend on
> teardown". This literally means @TearDown is not part of the API. Once
> again I'm fine with it but this kind of API is needed.
> "*this has always been true and will continue to be true"*
> Not really too since it was not clear before and runner dependent so users
> can depend on it.
>
> With both statements I think it should just get fixed and made reliable
> which is technically possible IMHO instead of creating a new API which
> would make teardown a cache hook which is an implementation detail which
> shouldn't surface in the API.
>
> @AfterExecution. @FinishBundle is once the bundle finishes so not a
> "finally" for the dofn regarding the execution.
>
> Side note: the success callback hook which has been discussed N times
> doesnt match the need which is really per instance (= accessible from that
> particular instance and not globally) in both success and failure cases.
>
>
> 2018-02-16 18:18 GMT+01:00 Kenneth Knowles :
>
>> Which runner's bundling are you concerned with? It sounds like the Flink
>> runner?
>>
>
> Flink, Spark, DirectRunner, DataFlow at least (others would be good but
> are out of scope)
>
>
>>
>> Kenn
>>
>>
>> On Fri, Feb 16, 2018 at 9:04 AM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>>
>>> 2018-02-16 17:59 GMT+01:00 Kenneth Knowles :
>>>
 What I am hearing is this:

  - @FinishBundle does what you want (a reliable "flush" call) but your
 runner is not doing a good job of bundling

>>>
>>> Nop, finishbundle is defined but not a bundle. Typically for 1 million
>>> rows I'll get 1 million calls in flink and 1 call in spark (today) so this
>>> is not a way to call a final task to release dofn internal instances or do
>>> some one time auditing.
>>>
>>>
  - @Teardown has well-defined semantics and they are not what you want

>>>
>>> "
>>> Note that calls to the annotated method are best effort, and may not
>>> occur for arbitrary reasons"
>>>
>>> is not really "well-defined" and is also a breaking change compared to
>>> the < 2.3.x (x >= 1) .
>>>
>>>
 So you are hoping for something that is called less frequently but is
 still 

Re: @TearDown guarantees

2018-02-16 Thread Thomas Groh
I'll note as well that you don't need a well defined DoFn lifecycle method
- you just want less granular bundling, which is a different requirement.

Teardown has well-defined interactions with the rest of the DoFn methods,
and what the runner is permitted to do when it calls Teardown - the fact
that we leave the runner discretion on when it can call teardown does not
make this poorly-defined; it means that users should not depend on teardown
being called for correct behavior, and *this has always been true and will
continue to be true*.


On Fri, Feb 16, 2018 at 9:04 AM Romain Manni-Bucau 
wrote:

>
> 2018-02-16 17:59 GMT+01:00 Kenneth Knowles :
>
>> What I am hearing is this:
>>
>>  - @FinishBundle does what you want (a reliable "flush" call) but your
>> runner is not doing a good job of bundling
>>
>
> Nop, finishbundle is defined but not a bundle. Typically for 1 million
> rows I'll get 1 million calls in flink and 1 call in spark (today) so this
> is not a way to call a final task to release dofn internal instances or do
> some one time auditing.
>
>
>>  - @Teardown has well-defined semantics and they are not what you want
>>
>
> "
> Note that calls to the annotated method are best effort, and may not occur
> for arbitrary reasons"
>
> is not really "well-defined" and is also a breaking change compared to the
> < 2.3.x (x >= 1) .
>
>
>> So you are hoping for something that is called less frequently but is
>> still mandatory.
>>
>> Just trying to establish the basics to start over and get this on track
>> to solving the real problem.
>>
>
> Concretely I need a well defined lifecycle for any DoFn executed in beam
> and today there is no such a thing making it impossible to develop
> correctly transforms/fn on an user side.
>
>
>>
>> Kenn
>>
>>
>> On Fri, Feb 16, 2018 at 8:51 AM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>> finish bundle is well defined and must be called, right, not at the end
>>> so you still miss teardown as a user. Bundles are defined by the runner and
>>> you can have 10 bundles per batch (even more for a stream ;)) so you
>>> dont want to release your resources or handle you execution auditing in it,
>>> you want it at the end so in tear down.
>>>
>>> So yes we must have teardown reliable somehow.
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau  |  Blog
>>>  | Old Blog
>>>  | Github
>>>  | LinkedIn
>>>  | Book
>>> 
>>>
>>> 2018-02-16 17:43 GMT+01:00 Reuven Lax :
>>>
 +1 I think @FinishBundle is the right thing to look at here.

 On Fri, Feb 16, 2018, 8:41 AM Jean-Baptiste Onofré 
 wrote:

> Hi Romain
>
> Is it not @FinishBundle your solution ?
>
> Regards
> JB
> Le 16 févr. 2018, à 17:06, Romain Manni-Bucau 
> a écrit:
>>
>> I see Reuven, so it is actually a broken contract for end users more
>> than a bug. Concretely a user must have a way to execute code once the
>> teardown is no more used and a teardown is populated by the user in the
>> context of an execution.
>> It means that if the environment wants to pool (cache) the instances
>> it must provide a postBorrowFromCache and preReturnToCache to let the 
>> user
>> handle that - we'll get back to EJB and passivation ;).
>>
>> Personally I think it is fine to cache the instances for the duration
>> of an execution but not accross execution. Concretely if you check out 
>> the
>> API it should just not be possible for a runner since the lifecycle is 
>> not
>> covered and the fact teardown can not be called today is an 
>> implementation
>> bug/leak surfacing the API.
>>
>> So I see 2 options:
>>
>> 1. make it mandatory and get rid of the caching - which shouldnt help
>> much in current state in terms of perf
>> 2. keep teardown a final release object (which is not that useful
>> cause of the end of the sentence) and add a clean cache lifecycle
>> management
>>
>> tempted to say 1 is saner short terms, in particular cause beam is
>> 2.x and users already use it this way.
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau  |   Blog
>>  | Old Blog
>>  |  Github
>>  | LinkedIn
>>  | Book
>> 
>>
>> 2018-02-16 16:53 GMT+01:00 Reuven Lax :
>>
>>> So the concern is that @TearDown might not 

Re: @TearDown guarantees

2018-02-16 Thread Thomas Groh
Given that I'm the original author of both the @Setup and @Teardown methods
and the PR under discussion, I thought I'd drop in to give in a bit of
history and my thoughts on the issue.

Originally (Dataflow 1.x), the spec required a Runner to deserialize a new
instance of a DoFn for every Bundle. For runners with small bundles (such
as the Dataflow Runner in streaming mode), this can be a significant cost,
and enabling DoFn reuse enables significant performance increases, as
Reuven mentioned.

However, for IOs and other transforms which have to perform expensive
execution-time initialization, DoFn reuse isn't enough by itself to enable
effective resource reuse - instead, we introduced the @Setup and @Teardown
methods, which exist to manage long-lived resources for a DoFn, such as
external connections. However, these methods are bound to the lifetime of
the DoFn, not to the lifetime of any specific elements.

As a result, there are two families of methods on a DoFn: element-related
and instance-related methods. @StartBundle, @ProcessElement,
and @FinishBundle are all the prior - they are guaranteed to be called
exactly once per input element, as observed by the Pipeline (exactly one
_logical_ invocation, one or more physical invocations). @Setup
and @Teardown are unrelated to this lifecycle, but because we need to be
able to successfully call @Setup before performing any processing method,
we can trivially guarantee that Setup will have been called before
processing any elements.

For any behavior that is required for the correct functioning of a
pipeline, such as flushing buffered side effects (writes to an external
system, etc), use of @Teardown is *never appropriate* - because it is
unrelated to the processing of elements, if that buffer is lost for any
reason, the elements that produced it will never be reprocessed by the
Pipeline (it is permanently lost, which is not the case for anything
performed in @FinishBundle).

For long-lived resources, those will be bound to the life of the DoFn. In
general the runner is required to tear down an instance if it is going to
continue execution but discard that instance of the DoFn. However, that's
not the only way a runner can discard of a DoFn - for example, it can
reclaim the container the DoFn executes on, or kill the JVM and restart it,
or just never discard it and use it forever, and be well-behaved.

The additional documentation exists to make it obvious that performing
anything based on elements within Teardown is extremely unsafe and is a
good way to produce inconsistent results or lose data.

I'll note as well that this is behavior that has always been the case -
StartBundle, ProcessElement, and FinishBundle will always be called
logically once per element, but Teardown was always called [0, 1] times per
element, and this is an update only to the documentation and not to the
actual behavior of any runner.



On Fri, Feb 16, 2018 at 8:44 AM Reuven Lax  wrote:

> +1 I think @FinishBundle is the right thing to look at here.
>
> On Fri, Feb 16, 2018, 8:41 AM Jean-Baptiste Onofré 
> wrote:
>
>> Hi Romain
>>
>> Is it not @FinishBundle your solution ?
>>
>> Regards
>> JB
>> Le 16 févr. 2018, à 17:06, Romain Manni-Bucau  a
>> écrit:
>>>
>>> I see Reuven, so it is actually a broken contract for end users more
>>> than a bug. Concretely a user must have a way to execute code once the
>>> teardown is no more used and a teardown is populated by the user in the
>>> context of an execution.
>>> It means that if the environment wants to pool (cache) the instances it
>>> must provide a postBorrowFromCache and preReturnToCache to let the user
>>> handle that - we'll get back to EJB and passivation ;).
>>>
>>> Personally I think it is fine to cache the instances for the duration of
>>> an execution but not accross execution. Concretely if you check out the API
>>> it should just not be possible for a runner since the lifecycle is not
>>> covered and the fact teardown can not be called today is an implementation
>>> bug/leak surfacing the API.
>>>
>>> So I see 2 options:
>>>
>>> 1. make it mandatory and get rid of the caching - which shouldnt help
>>> much in current state in terms of perf
>>> 2. keep teardown a final release object (which is not that useful cause
>>> of the end of the sentence) and add a clean cache lifecycle management
>>>
>>> tempted to say 1 is saner short terms, in particular cause beam is 2.x
>>> and users already use it this way.
>>>
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau  |   Blog
>>>  | Old Blog
>>>  |  Github
>>>  | LinkedIn
>>>  | Book
>>> 
>>>
>>> 2018-02-16 16:53 GMT+01:00 Reuven Lax :
>>>
 So the concern 

Tracking Sickbayed tests in Jira

2018-01-31 Thread Thomas Groh
Hey everyone;

I've realized that although we tend to tag any test we suppress (due to
consistent flakiness) in the codebase, and file an associated JIRA issue
with the failure, we don't have any centralized way to track tests that
we're currently suppressing. To try and get more visibility into our
suppressed tests (without running `grep -r @Ignore ...` over the codebase
over and over), I've created a label for these tests, and applied it to all
of the issues that annotated `@Ignore` tests point to.

Ideally, all of our suppressed tests would be tagged with this label, so we
can get some visibility into which components we would normally expect to
have coverage but don't currently.

The search to look at all of these issues is
https://issues.apache.org/jira/browse/BEAM-3583?jql=project%20%3D%20BEAM%20AND%20labels%20%3D%20sickbay

If you're looking for something to do, or have other issues that should be
labelled, feel free to jump right in.

Yours,

Thomas


Re: A personal update

2017-12-13 Thread Thomas Groh
It's good to see you around. Welcome back.

On Wed, Dec 13, 2017 at 8:43 AM, Chamikara Jayalath 
wrote:

> Welcome back :)
>
> - Cham
>
> On Wed, Dec 13, 2017 at 8:41 AM Jason Kuster 
> wrote:
>
>> Glad to have you back. :)
>>
>> On Wed, Dec 13, 2017 at 8:32 AM, Eugene Kirpichov 
>> wrote:
>>
>>> Happy to see you return, and thank you again for all you've done so far!
>>>
>>> On Wed, Dec 13, 2017 at 10:24 AM Aljoscha Krettek 
>>> wrote:
>>>
 Welcome back! :-)

 > On 13. Dec 2017, at 15:42, Ismaël Mejía  wrote:
 >
 > Hello Davor, great to know you are going to continue contributing to
 > the project. Welcome back and best of wishes for this new phase !
 >
 > On Wed, Dec 13, 2017 at 3:12 PM, Kenneth Knowles 
 wrote:
 >> Great to have you back!
 >>
 >> On Tue, Dec 12, 2017 at 11:20 PM, Robert Bradshaw <
 rober...@google.com>
 >> wrote:
 >>>
 >>> Great to hear from you again, and really happy you're sticking
 around!
 >>>
 >>> - Robert
 >>>
 >>>
 >>> On Tue, Dec 12, 2017 at 10:47 PM, Ahmet Altay 
 wrote:
  Welcome back! Looking forward to your contributions.
 
  Ahmet
 
  On Tue, Dec 12, 2017 at 10:05 PM, Jesse Anderson
  
  wrote:
 >
 > Congrats!
 >
 >
 > On Wed, Dec 13, 2017, 5:54 AM Jean-Baptiste Onofré <
 j...@nanthrax.net>
 > wrote:
 >>
 >> Hi Davor,
 >>
 >> welcome back !!
 >>
 >> It's really great to see you back active in the Beam community.
 We
 >> really
 >> need you !
 >>
 >> I'm so happy !
 >>
 >> Regards
 >> JB
 >>
 >> On 12/13/2017 05:51 AM, Davor Bonaci wrote:
 >>> My dear friends,
 >>> As many of you have noticed, I’ve been visibly absent from the
 >>> project
 >>> for a
 >>> little while. During this time, a great number of you kept
 reaching
 >>> out, and for
 >>> that I’m deeply humbled and grateful to each and every one of
 you.
 >>>
 >>> I needed some time for personal reflection, which led to a
 >>> transition
 >>> in my
 >>> professional life. As things have settled, I’m happy to again be
 >>> working among
 >>> all of you, as we propel this project forward. I plan to be
 active
 >>> in
 >>> the
 >>> future, but perhaps not quite full-time as I was before.
 >>>
 >>> In the near term, I’m working on getting the report to the Board
 >>> completed, as
 >>> well as framing the discussion about the project state and
 vision
 >>> going
 >>> forwards. Additionally, I’ll make sure that we foster healthy
 >>> community
 >>> culture
 >>> and operate in the Apache Way.
 >>>
 >>> For those who are curious, I’m happy to say that I’m starting a
 >>> company
 >>> building
 >>> products related to Beam, along with several other members of
 this
 >>> community and
 >>> authors of this technology. I’ll share more on this next year,
 but
 >>> until then if
 >>> you have a data processing problem or an Apache Beam question,
 I’d
 >>> love
 >>> to hear
 >>> from you ;-).
 >>>
 >>> Thanks -- and so happy to be back!
 >>>
 >>> Davor
 >>
 >> --
 >> Jean-Baptiste Onofré
 >> jbono...@apache.org
 >> http://blog.nanthrax.net
 >> Talend - http://www.talend.com
 
 
 >>
 >>


>>
>>
>> --
>> ---
>> Jason Kuster
>> Apache Beam / Google Cloud Dataflow
>>
>


Re: Guarding against unsafe triggers at construction time

2017-12-04 Thread Thomas Groh
ion trigger of most triggers be the
>>"always-fire" trigger.* Seems that this should be the case for all
>>triggers except the watermark trigger. This will definitely increase
>>safety, but lead to more eager firing of downstream aggregations. It also
>>will violate a user's expectation that a fire-once trigger fires 
>> everything
>>downstream only once, but that expectation appears impossible to satisfy
>>safely.
>>- *Make the continuation trigger of some triggers be the "invalid"
>>trigger, *i.e. require the user to set it explicitly: there's in
>>general no good and safe way to infer what a trigger on a second GBK
>>"truly" should be, based on the trigger of the PCollection input into a
>>first GBK. This is especially true for terminating triggers.
>>- *Prohibit top-level terminating triggers entirely. *This will
>>ensure that the only data that ever gets dropped is "droppably late" data.
>>
>>
>> Do people think that these options are sensible?
>> +Kenn Knowles <k...@google.com> +Thomas Groh <tg...@google.com> +Ben
>> Chambers <bchamb...@google.com> is this a fair summary of our discussion?
>>
>> Thanks!
>>
>
>


Re: [VOTE] Use Gradle for Apache Beam developmental processes

2017-11-28 Thread Thomas Groh
+1

On Tue, Nov 28, 2017 at 10:04 AM, Valentyn Tymofieiev 
wrote:

> +1 I support the process change
>
>
> On Tue, Nov 28, 2017 at 9:56 AM, Kenneth Knowles  wrote:
>
>> +1 (binding)
>>
>> On Tue, Nov 28, 2017 at 9:55 AM, Lukasz Cwik  wrote:
>>
>>> This is a procedural vote for migrating to use Gradle for all our
>>> development related processes (building, testing, and releasing). A
>>> majority vote will signal that:
>>> * Gradle build files will be supported and maintained alongside any
>>> remaining Maven files.
>>> * Once Gradle is able to replace Maven in a specific process (or portion
>>> thereof), Maven will no longer be maintained for said process (or portion
>>> thereof) and will be removed.
>>>
>>> +1 I support the process change
>>> 0 I am indifferent to the process change
>>> -1 I would like to remain with our current processes
>>>
>>> 
>>> 
>>>
>>> Below is a summary of information contained in the disucssion thread
>>> comparing Gradle and Maven: https://lists.apache.org/threa
>>> d.html/225dddcfc78f39bbb296a0d2bbef1caf37e17677c7e5573f0b6fe
>>> 253@%3Cdev.beam.apache.org%3E
>>>
>>> Gradle (mins)
>>> min: 25.04
>>> max: 160.14
>>> median: 45.78
>>> average: 52.19
>>> stdev: 30.80
>>>
>>> Maven (mins)
>>> min: 56.86
>>> max: 216.55 (actually > 240 mins because this data does not include
>>> timeouts)
>>> median: 87.93
>>> average: 109.10
>>> stdev: 48.01
>>>
>>> Maven
>>> Java Support: Mature
>>> Python Support: None (via mvn exec plugin)
>>> Go Support: Rudimentary (via mvn plugin)
>>> Protobuf Support: Rudimentary (via mvn plugin)
>>> Docker Support: Rudimentary (via mvn plugin)
>>> ASF Release Automation: Mature
>>> Jenkins Support: Mature
>>> Configuration Language: XML
>>> Multiple Java Versions: Yes
>>> Static Analysis Tools: Some
>>> ASF Release Audit Tool (RAT): Rudimentary (plugin complete and
>>> longstanding but poor)
>>> IntelliJ Integration: Mature
>>> Eclipse Integration: Mature
>>> Extensibility: Mature (updated per JB from discuss thread)
>>> Number of GitHub Projects Using It: 146k
>>> Continuous build daemon: None
>>> Incremental build support: None (note that this is not the same as
>>> incremental compile support offered by the compiler plugin)
>>> Intra-module dependencies: Rudimentary (requires the use of many
>>> profiles to get per runner dependencies)
>>>
>>> Gradle
>>> Java Support: Mature
>>> Python Support: Rudimentary (pygradle, lacks pypi support)
>>> Go Support: Rudimentary (gogradle plugin)
>>> Protobuf Support: Rudimentary (via protobuf plugin)
>>> Docker Support: Rudimentary (via docker plugin)
>>> ASF Release Automation: ?
>>> Jenkins Support: Mature
>>> Configuration Language: Groovy
>>> Multiple Java Versions: Yes
>>> Static Analysis Tools: Some
>>> ASF Release Audit Tool (RAT): Rudimentary (plugin just calls Apache
>>> Maven ANT plugin)
>>> IntelliJ Integration: Mature
>>> Eclipse Integration: Mature
>>> Extensibility: Mature
>>> Number of GitHub Projects Using It: 122k
>>> Continuous build daemon: Mature
>>> Incremental build support: Mature
>>> Intra-module dependencies: Mature (via configurations)
>>>
>>>
>>
>


Re: [DISCUSS] Updating contribution guide for gitbox

2017-11-28 Thread Thomas Groh
I am strongly in favor of (1); I have no strong feelings about (2); I agree
on (3), but generically am not hugely concerned, so long as back-references
to the original PR are maintained, which is where most of the context
lives. It is nice to have the change broken up into as many individually
useful parts as possible, so I wouldn't really choose (b) or (c).

Of note, (1) will not be possible if you would like another contributor to
review and they have not set up their gitbox account. Notably this is
always going to be the case for contributors who are not committers - we
should maintain use of the "R: @reviewer" comments in those cases.

On Tue, Nov 28, 2017 at 9:45 AM, Kenneth Knowles  wrote:

> Hi all,
>
> James brought up a great question in Slack, which was how should we use
> the merge button, illustrated [1]
>
> I want to broaden the discussion to talk about all the new capabilities:
>
> 1. Whether & how to use the "reviewer" field
> 2. Whether & how to use the "assignee" field
> 3. Whether & how to use the merge button
>
> My preferences are:
>
> 1. Use the reviewer field instead of "R:" comments.
> 2. Use the assignee field to keep track of who the review is blocked on
> (either the reviewer for more comments or the author for fixes)
> 3. Use merge commits, but editing the commit subject line
>
> To expand on part 3, GitHub's merge button has three options [1]. They are
> not described accurately in the UI, as they all say "merge" when only one
> of them performs a merge. They do the following:
>
> (a) Merge the branch with a merge commit
> (b) Squash all the commits, rebase and push
> (c) Rebase and push without squash
>
> Unlike our current guide, all of these result in a "merged" status for the
> PR, so we can correctly distinguish those PRs that were actually merged.
>
> My votes on these options are:
>
> (a) +1 this preserves the most information
> (b) -1 this erases the most information
> (c) -0 this is just sort of a middle ground; it breaks commit hashes, does
> not have a clear merge commit, but preserves other info
>
> Kenn
>
> [1] https://apachebeam.slack.com/messages/C1AAFJYMP/
>
>
>
>
>
> Kenn
>


Re: [DISCUSS] [Java] Private shaded dependency uber jars

2017-10-17 Thread Thomas Groh
+1 to the goal. I'm hugely in favor of not doing the same shading work
every time for dependencies we know we'll use.

This also means that if we end up pulling in transitive dependencies we
don't want in any particular module we can avoid having to adjust our
repackaging strategy for that module - which I have run into face-first in
the past.

On Tue, Oct 17, 2017 at 9:48 AM, Kenneth Knowles 
wrote:

> Hi all,
>
> Shading is a big part of how we keep our dependencies sane in Beam. But
> downsides: shading is super slow, causes massive jar bloat, and kind of
> hard to get right because artifacts and namespaces are not 1-to-1.
>
> I know that some communities distribute their own shaded distributions of
> dependencies. I had a thought about doing something similar that I wanted
> to throw out there for people to poke holes in.
>
> To set the scene, here is how I view shading:
>
>  - A module has public dependencies and private dependencies.
>  - Public deps are used for data interchange; users must share these deps.
>  - Private deps are just functionality and can be hidden (in our case,
> relocated + bundled)
>  - It isn't necessarily that simple, because public and private deps might
> interact in higher-order ways ("public" is contagious)
>
> Shading is an implementation detail of expressing these characteristics. We
> use shading selectively because of its downsides I mentioned above.
>
> But what about this idea: Introduce shaded deps as a single separate
> artifact.
>
>  - sdks/java/private-deps: bundled uber jar with relocated versions of
> everything we want to shade
>
>  - sdks/java/core and sdks/java/harness: no relocation or bundling -
> depends on `beam-sdks-java-private-deps` and imports like
> `org.apache.beam.sdk.private.com.google.common` directly (this is what
> they
> are rewritten to
>
> Some benefits
>
>  - much faster builds of other modules
>  - only one shaded uber jar
>  - rare/no rebuilds of the uber jar
>  - can use maven enforcer to forbid imports like com.google.common
>  - configuration all in one place
>  - no automated rewriting of our real code, which has led to some major
> confusion
>  - easy to implement incrementally
>
> Downsides:
>
>  - plenty of effort work to get there
>  - unclear how many different such deps modules we need; sharing them could
> get weird
>  - if we hit a roadblock, we will have committed a lot of time
>
> Just something I was musing as I spent another evening waiting for slow
> builds to try to confirm changes to brittle poms.
>
> Kenn
>


Re: [DISCUSS] Switch to gitbox

2017-10-09 Thread Thomas Groh
+1.

I do love myself a forcing function for passing tests.

On Mon, Oct 9, 2017 at 7:51 AM, Aljoscha Krettek 
wrote:

> +1
>
> > On 6. Oct 2017, at 18:57, Kenneth Knowles 
> wrote:
> >
> > Sounds great. If I recall correctly, it means we could also us
> assignment /
> > review requests to pass pull requests around, instead of "R: foo"
> comments.
> >
> > On Fri, Oct 6, 2017 at 9:30 AM, Tyler Akidau  >
> > wrote:
> >
> >> +1
> >>
> >> On Fri, Oct 6, 2017 at 8:54 AM Reuven Lax 
> >> wrote:
> >>
> >>> +1
> >>>
> >>> On Oct 6, 2017 4:51 PM, "Lukasz Cwik" 
> wrote:
> >>>
>  I think its a great idea and find that the mergebot works well on the
>  website.
>  Since gitbox enforces that the precommit checks pass, it would also be
> >> a
>  good forcing function for the community to maintain reliably passing
> >>> tests.
> 
>  On Fri, Oct 6, 2017 at 4:58 AM, Jean-Baptiste Onofré  >
>  wrote:
> 
> > Hi guys,
> >
> > We use Apache gitbox for the website and it works fine (as soon as
> >> you
> > linked your Apache and github with 2FA enabled).
> >
> > What do you think about moving to gitbox for the codebase itself ?
> >
> > It could speed up the review and merge for the PRs.
> >
> > Thoughts ?
> >
> > Regards
> > JB
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
> >
> 
> >>>
> >>
>
>


Re: Proposal: Unbreak Beam Python 2.1.0 with 2.1.1 bugfix release

2017-09-21 Thread Thomas Groh
+1 on cutting a release to fix this.

As an aside, if we later determine that we require a release that includes
Java, that release will be 2.1.2 (or equivalent) - the reason we aren't
releasing Java artifacts is a matter of convenience (they have the same
contents as the 2.1.0 release), not because the versions are diverging
within this minor version.

On Tue, Sep 19, 2017 at 5:59 PM, Ben Chambers  wrote:

> Any elaboration or jira issues describing what is broken? Any proposal for
> what changes need to happen to fix it?
>
> On Tue, Sep 19, 2017, 5:49 PM Chamikara Jayalath 
> wrote:
>
> > +1 for cutting 2.1.1 for Python SDK only.
> >
> > Thanks,
> > Cham
> >
> > On Tue, Sep 19, 2017 at 5:43 PM Robert Bradshaw
> > 
> > wrote:
> >
> > > +1. Right now anyone who follows our quickstart instructions or
> > > otherwise installs the latest release of apache_beam is broken.
> > >
> > > On Tue, Sep 19, 2017 at 2:05 PM, Charles Chen 
> > > wrote:
> > > > The latest version (2.1.0) of Beam Python (
> > > > https://pypi.python.org/pypi/apache-beam) is broken due to a change
> in
> > > the
> > > > "six" dependency (BEAM-2964
> > > > ).  For instance,
> > > > installing "apache-beam" in a clean environment and running "python
> -m
> > > > apache_beam.examples.wordcount" results in a failure.  This issue is
> > > fixed
> > > > at head with Robert's recent PR (
> > > https://github.com/apache/beam/pull/3865).
> > > >
> > > > I propose to cherry-pick this change on top of the 2.1.0 release
> branch
> > > (to
> > > > form a new 2.1.1 release branch) and call a vote to release version
> > 2.1.1
> > > > only for Beam Python.
> > > >
> > > > Alternatively, to preserve version alignment we could also re-release
> > > Beam
> > > > Java 2.1.1 with the same code as 2.1.0 modulo the version bump.
> > > Thoughts?
> > > >
> > > > Best,
> > > > Charles
> > >
> >
>


Re: Migration From 1.9.x to 2.1.0

2017-09-13 Thread Thomas Groh
for (1) and (4), the DoFn methods have been moved to be reflection based.
Instead of using `@Override` in your DoFns, you should annotate those
methods with `@StartBundle`, `@ProcessElement`, and `@FinishBundle` instead.

For (2), Aggregators have been removed. Our suggested replacement is the
use of the `Metrics` class - in this case, a Counter metric is appropriate.

For (3), `sideOutput` has been renamed to `output`; the use is otherwise
identical.

for (5), the pattern has changed from `TextIO.Read.from(...)` to
`TextIO.read().from(...)` (which should allow the remainder of the
PTransform to also be configured without having to specify a Filepattern up
front)

On Tue, Sep 12, 2017 at 8:39 PM, Arunkumar Santhanagopalan <
arunk...@gmail.com> wrote:

> Hi,
>
> We are trying to migrate from Dataflow 1.9.x to Dataflow 2.1.0
>
> I need help with the following changes
>
>
> 1.
> class Join extends DoFn {
> @Override
> public void startBundle(Context c) throws Exception {
> super.startBundle(c);
> createParser();
> }
>
> Method "startBundle" does not override method startBundle from its
> superclass
>
>
> 2.
> class Join extends DoFn{
>   private final Aggregator duplicatesCount =
> createAggregator(DUPLICATES_COUNTER, new Sum.SumLongFn());
>
> cannot resolve method 'createAggregator, Sum.SumLongFn has a private access
>
> 3.
> class Join extends DoFn{
>   public void processElement(ProcessContext c) {
>  c.sideOutput(duplicatesTag, s)
>   }
> cannot resolve method sideOutput(org.apache.beam.sdk.values.TupleTag)
>
>
> 4.
> public abstract class ReadCsv extends DoFn {
>
> @Override
> public final void processElement(ProcessContext c) throws Exception {
> T output = processElement(c.element(), c);
> if (output != null) {
> c.output(output);
> }
> }
> Method does not override method processElement from its superclass
>
>
> 5.
> import org.apache.beam.sdk.io.TextIO;
>
> TextIO.Read.from("gs://spins/data/part-*")
> Non-static method "from" cannot be referenced from static context
>


Re: Policy for stale PRs

2017-08-16 Thread Thomas Groh
JIRAs should only be closed if the issue that they track is no longer
relevant (either via being fixed or being determined to not be a problem).
If a JIRA isn't being meaningfully worked on, it should be unassigned (in
all cases, not just if there's an associated pull request that has not been
worked on).

+1 on closing PRs with no action from the original author after some
reasonable time frame (90 days is certainly reasonable; 30 might be too
short) if the author has not responded to actionable feedback.

On Wed, Aug 16, 2017 at 12:07 PM, Sourabh Bajaj <
sourabhba...@google.com.invalid> wrote:

> Some projects I have seen close stale PRs after 30 days, saying "Closing
> due to lack of activity, please feel free to re-open".
>
> On Wed, Aug 16, 2017 at 12:05 PM Ahmet Altay 
> wrote:
>
> > Sounds like we have consensus. Since this is a new policy, I would
> suggest
> > picking the most flexible option for now (90 days) and we can tighten it
> in
> > the future. To answer Kenn's question, I do not know, how other projects
> > handle this. I did a basic search but could not find a good answer.
> >
> > What mechanism can we use to close PRs, assuming that author will be out
> of
> > communication. We can push a commit with a "This closes #xyz #abc"
> message.
> > Is there another way to do this?
> >
> > Ahmet
> >
> > On Wed, Aug 16, 2017 at 4:32 AM, Aviem Zur  wrote:
> >
> > > Makes sense to close after a long time of inactivity and no response,
> and
> > > as Kenn mentioned they can always re-open.
> > >
> > > On Wed, Aug 16, 2017 at 12:20 AM Jean-Baptiste Onofré  >
> > > wrote:
> > >
> > > > If we consider the author, it makes sense.
> > > >
> > > > Regards
> > > > JB
> > > >
> > > > On Aug 15, 2017, 01:29, at 01:29, Ted Yu 
> wrote:
> > > > >The proposal makes sense.
> > > > >
> > > > >If the author of PR doesn't respond for 90 days, the PR is likely
> out
> > > > >of
> > > > >sync with current repo.
> > > > >
> > > > >Cheers
> > > > >
> > > > >On Mon, Aug 14, 2017 at 5:27 PM, Ahmet Altay
>  > >
> > > > >wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Do we have an existing policy for handling stale PRs? If not could
> > we
> > > > >come
> > > > >> up with one. We are getting close to 100 open PRs. Some of the
> open
> > > > >PRs
> > > > >> have not been touched for a while, and if we exclude the pings the
> > > > >number
> > > > >> will be higher.
> > > > >>
> > > > >> For example, we could close PRs that have not been updated by the
> > > > >original
> > > > >> author for 90 days even after multiple attempts to reach them
> (e.g.
> > > > >[1],
> > > > >> [2] are such PRs.)
> > > > >>
> > > > >> What do you think?
> > > > >>
> > > > >> Thank you,
> > > > >> Ahmet
> > > > >>
> > > > >> [1] https://github.com/apache/beam/pull/1464
> > > > >> [2] https://github.com/apache/beam/pull/2949
> > > > >>
> > > >
> > >
> >
>


Re: Adding back PipelineRunner#apply method

2017-08-15 Thread Thomas Groh
This style of method doesn't fit with the current approach of pipeline
construction, where the PipelineRunner need not be specified until the
pipeline is run; as such, the runner can't observe the construction of the
Pipeline, as it may not exist during the construction of the Pipeline.

On Tue, Aug 15, 2017 at 8:41 AM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> ... And remember it and make available inside PCollection (which
> application produced this collection).
>
> On Tue, Aug 15, 2017, 8:39 AM Eugene Kirpichov 
> wrote:
>
> > In general, no - but the implementation of PAssertionSite exemplifies the
> > approach. I guess it could be useful to make this a general beam feature
> > and remember it for all transforms. It would probably be best to
> implement
> > inside Pipeline.apply().
> >
> > On Tue, Aug 15, 2017, 7:02 AM Shen Li  wrote:
> >
> >> Hi Eugene,
> >>
> >> Thanks for sharing the info. That PAssertionSite tracks where an
> assertion
> >> error occurred. Do you know if it is possible to get the class name and
> >> line number where a PTransform was added?
> >>
> >> Thanks,
> >> Shen
> >>
> >> On Mon, Aug 14, 2017 at 10:54 PM, Eugene Kirpichov <
> >> kirpic...@google.com.invalid> wrote:
> >>
> >> > Hi Shen,
> >> > Responding just to one part of your message - "remember the line at
> >> which
> >> > the PTransform was added": take a look at
> >> > https://github.com/apache/beam/pull/2247 which does this for PAssert.
> >> >
> >> > On Mon, Aug 14, 2017 at 7:32 PM Shen Li  wrote:
> >> >
> >> > > In 0.5.0 or earlier releases, PipelineRunner provides an
> >> > > apply(PTransform, InputT) method which allows
> runner
> >> > > implementation to perform actions when the PTransform is added to
> the
> >> > > pipeline. In later releases, that apply(...) method has been
> replaced
> >> by
> >> > a
> >> > > Pipeline#replaceAll() method, where the runner can only get involved
> >> > after
> >> > > the pipeline has been fully constructed. In terms of override
> >> > PTransforms,
> >> > > these two APIs are identical. But, the early API could still be
> >> helpful.
> >> > > For example, the runner could remember the line at which the
> >> PTransform
> >> > was
> >> > > added, and provide that info to users to assist debugging. Is it
> >> possible
> >> > > to add that API back? Or is there any other way to involve the
> runner
> >> > when
> >> > > a PTransform is added?
> >> > >
> >> > > Thanks,
> >> > > Shen
> >> > >
> >> >
> >>
> >
>


Re: [VOTE] Release 2.1.0, release candidate #3

2017-08-11 Thread Thomas Groh
+1

Verified:
* java examples, examples-java8 generation with the archetype plugin
* WordCount on the Java DirectRunner
* WordCount on the Java DataflowRunner
* Complete Game Examples on the Java DataflowRunner
* Streaming Game Examples on the Java DirectRunner


On Fri, Aug 11, 2017 at 10:21 AM, Mingmin Xu  wrote:

> +1 thanks JB!
>
> verified:
> 1. migrate two applications from version 2.0.0 to 2.1.0 (FlinkRunner on
> YARN)
> 2. mvn clean install pass locally;
> 3. run wordcount with DirectRunner/FlinkRunner;
>
>
> On Thu, Aug 10, 2017 at 10:14 PM, Jean-Baptiste Onofré 
> wrote:
>
> > Gently reminder on this thread.
> >
> > Thanks !
> > Regards
> > JB
> >
> >
> > On 08/09/2017 07:08 AM, Jean-Baptiste Onofré wrote:
> >
> >> Hi everyone,
> >>
> >> Please review and vote on the release candidate #3 for the version
> 2.1.0,
> >> as follows:
> >>
> >> [ ] +1, Approve the release
> >> [ ] -1, Do not approve the release (please provide specific comments)
> >>
> >>
> >> The complete staging area is available for your review, which includes:
> >> * JIRA release notes [1],
> >> * the official Apache source release to be deployed to dist.apache.org
> >> [2], which is signed with the key with fingerprint C8282E76 [3],
> >> * all artifacts to be deployed to the Maven Central Repository [4],
> >> * source code tag "v2.1.0-RC3" [5],
> >> * website pull request listing the release and publishing the API
> >> reference manual [6].
> >> * Python artifacts are deployed along with the source release to the
> >> dist.apache.org [2].
> >>
> >> The vote will be open for at least 72 hours. It is adopted by majority
> >> approval, with at least 3 PMC affirmative votes.
> >>
> >> Thanks,
> >> JB
> >>
> >> [1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje
> >> ctId=12319527=12340528
> >> [2] https://dist.apache.org/repos/dist/dev/beam/2.1.0/
> >> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> >> [4] https://repository.apache.org/content/repositories/orgapache
> >> beam-1020/
> >> [5] https://github.com/apache/beam/tree/v2.1.0-RC3
> >> [6] https://github.com/apache/beam-site/pull/270
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>
>
>
> --
> 
> Mingmin
>


Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Thomas Groh
ppen as close to the side-effecting
> operation
> > as
> > > > > possible, since upstream transforms within a composite could
> > introduce
> > > > > non-determinism. So it's the primitive transform that should own
> the
> > > > > requirement.
> > > > >
> > > > > Are there other primitives that should be annotated? 'Combine' is
> > > > > interesting because it optimized in Dataflow (and perhaps other
> > > runners)
> > > > to
> > > > > partially apply before a GroupByKey.
> > > > >
> > > > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau
> > > <taki...@google.com.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > +1 to the annotation idea, and to having it on processTimer.
> > > > > >
> > > > > > -Tyler
> > > > > >
> > > > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <
> > > aljos...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 to the annotation approach. I outlined how implementing this
> > > would
> > > > > > work
> > > > > > > in the Flink runner in the Thread about the exactly-once Kafka
> > > Sink.
> > > > > > >
> > > > > > > > On 9. Aug 2017, at 23:03, Reuven Lax
> <re...@google.com.INVALID
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > Yes - I don't think we should try and make any deterministic
> > > > > guarantees
> > > > > > > > about what is in a bundle. Stability guarantees are per
> element
> > > > only.
> > > > > > > >
> > > > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh
> > > > <tg...@google.com.invalid
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> +1 to the annotation-on-ProcessElement approach.
> > ProcessElement
> > > is
> > > > > the
> > > > > > > >> minimum implementation requirement of a DoFn, and should be
> > > where
> > > > > the
> > > > > > > >> processing logic which depends on characteristics of the
> > inputs
> > > > lie.
> > > > > > > It's a
> > > > > > > >> good way of signalling the requirements of the Fn, and
> letting
> > > the
> > > > > > > runner
> > > > > > > >> decide.
> > > > > > > >>
> > > > > > > >> I have a minor concern that this may not work as expected
> for
> > > > users
> > > > > > that
> > > > > > > >> try to batch remote calls in `FinishBundle` - we should make
> > > sure
> > > > we
> > > > > > > >> document that it is explicitly the input elements that will
> be
> > > > > > replayed,
> > > > > > > >> and bundles and other operational are still arbitrary.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax
> > > > > <re...@google.com.invalid
> > > > > > >
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> I think deterministic here means deterministically
> > replayable.
> > > > i.e.
> > > > > > no
> > > > > > > >>> matter how many times the element is retried, it will
> always
> > be
> > > > the
> > > > > > > same.
> > > > > > > >>>
> > > > > > > >>> I think we should also allow specifying this on
> processTimer.
> > > > This
> > > > > > > would
> > > > > > > >>> mean that any keyed state written in a previous
> > processElement
> > > > must
> > > > > > be
> > > > > > > >>> guaranteed durable before processTimer is called.
> > > > > > > >>&g

Re: [PROPOSAL] "Requires deterministic input"

2017-08-09 Thread Thomas Groh
As I said, a minor concern; we should be explicit in our documentation that
it is only the input _elements_ that are
deterministic/stable/replayable/etc, and not operational concerns
surrounding them (such as bundling). I'd generally avoid making the actual
annotation more verbose.

On Wed, Aug 9, 2017 at 1:49 PM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > I have a minor concern that this may not work as expected for users that
> > try to batch remote calls in `FinishBundle` - we should make sure we
> > document that it is explicitly the input elements that will be replayed,
> > and bundles and other operational are still arbitrary.
> >
>
> I think it is safe since writing has to be idempotent per-element anyhow,
> and a DoFn must be discarded on failure and a new version used for any
> retries.
>
> Was your concern something else? Were you concerned that the phrasing may
> imply a deterministic ordering of calls to the DoFn's methods? Perhaps
> @RequiresStableInputContents to make clear it has nothing to do with
> replaying method calls?
>
> Kenn
>
>
> >
> >
> > On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid>
> > wrote:
> >
> > > I think deterministic here means deterministically replayable. i.e. no
> > > matter how many times the element is retried, it will always be the
> same.
> > >
> > > I think we should also allow specifying this on processTimer. This
> would
> > > mean that any keyed state written in a previous processElement must be
> > > guaranteed durable before processTimer is called.
> > >
> > >
> > > On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org>
> > > wrote:
> > >
> > > > I strongly agree with this proposal. I think moving away from "just
> > > insert
> > > > a GroupByKey for one of the 3 different reasons you may want it"
> > towards
> > > > APIs that allow code to express the requirements they have and the
> > runner
> > > > to choose the best way to meet this is a major step forwards in terms
> > of
> > > > portability.
> > > >
> > > > I think "deterministic" may be misleading. The actual contents of the
> > > > collection aren't deterministic if upstream computations aren't. The
> > > > property we really need is that once an input may have been observed
> by
> > > the
> > > > side-effecting code it will never be observed with a different value.
> > > >
> > > > I would propose something RequiresStableInput, to indicate that the
> > input
> > > > must be stable as observed by the function. I could also see
> something
> > > > hinting at the fact we don't recompute the input, such as
> > > > RequiresMemoizedInput or RequiresNoRecomputation.
> > > >
> > > > -- Ben
> > > >
> > > > P.S For anyone interested other uses of GroupByKey that we may want
> to
> > > > discuss APIs for would be preventing retry across steps (eg.,
> > preventing
> > > > fusion) and redistributing inputs across workers.
> > > >
> > > > On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
> <k...@google.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > This came up again, so I wanted to push it along by proposing a
> > > specific
> > > > > API for Java that could have a derived API in Python. I am writing
> > this
> > > > > quickly to get something out there, so I welcome suggestions for
> > > > revision.
> > > > >
> > > > > Today a DoFn has a @ProcessElement annotated method with various
> > > > automated
> > > > > parameters, but most fundamentally this:
> > > > >
> > > > > @ProcessElement
> > > > > public void process(ProcessContext ctx) {
> > > > >   ctx.element() // to access the current input element
> > > > >   ctx.output(something) // to write to default output collection
> > > > >   ctx.output(tag, something) // to write to other output
> collections
> > > > > }
> > > > >
> > > > > For some time, we have hoped to unpack the context - it is a
> > > > > backwards-compatibility pattern made obsolete by the new DoFn
> design.
> > > So
> > > &g

Re: [PROPOSAL] "Requires deterministic input"

2017-08-09 Thread Thomas Groh
+1 to the annotation-on-ProcessElement approach. ProcessElement is the
minimum implementation requirement of a DoFn, and should be where the
processing logic which depends on characteristics of the inputs lie. It's a
good way of signalling the requirements of the Fn, and letting the runner
decide.

I have a minor concern that this may not work as expected for users that
try to batch remote calls in `FinishBundle` - we should make sure we
document that it is explicitly the input elements that will be replayed,
and bundles and other operational are still arbitrary.



On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax 
wrote:

> I think deterministic here means deterministically replayable. i.e. no
> matter how many times the element is retried, it will always be the same.
>
> I think we should also allow specifying this on processTimer. This would
> mean that any keyed state written in a previous processElement must be
> guaranteed durable before processTimer is called.
>
>
> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers 
> wrote:
>
> > I strongly agree with this proposal. I think moving away from "just
> insert
> > a GroupByKey for one of the 3 different reasons you may want it" towards
> > APIs that allow code to express the requirements they have and the runner
> > to choose the best way to meet this is a major step forwards in terms of
> > portability.
> >
> > I think "deterministic" may be misleading. The actual contents of the
> > collection aren't deterministic if upstream computations aren't. The
> > property we really need is that once an input may have been observed by
> the
> > side-effecting code it will never be observed with a different value.
> >
> > I would propose something RequiresStableInput, to indicate that the input
> > must be stable as observed by the function. I could also see something
> > hinting at the fact we don't recompute the input, such as
> > RequiresMemoizedInput or RequiresNoRecomputation.
> >
> > -- Ben
> >
> > P.S For anyone interested other uses of GroupByKey that we may want to
> > discuss APIs for would be preventing retry across steps (eg., preventing
> > fusion) and redistributing inputs across workers.
> >
> > On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles 
> > wrote:
> >
> > > This came up again, so I wanted to push it along by proposing a
> specific
> > > API for Java that could have a derived API in Python. I am writing this
> > > quickly to get something out there, so I welcome suggestions for
> > revision.
> > >
> > > Today a DoFn has a @ProcessElement annotated method with various
> > automated
> > > parameters, but most fundamentally this:
> > >
> > > @ProcessElement
> > > public void process(ProcessContext ctx) {
> > >   ctx.element() // to access the current input element
> > >   ctx.output(something) // to write to default output collection
> > >   ctx.output(tag, something) // to write to other output collections
> > > }
> > >
> > > For some time, we have hoped to unpack the context - it is a
> > > backwards-compatibility pattern made obsolete by the new DoFn design.
> So
> > > instead it would look like this:
> > >
> > > @ProcessElement
> > > public void process(Element element, MainOutput mainOutput, ...) {
> > >   element.get() // to access the current input element
> > >   mainOutput.output(something) // to write to the default output
> > collection
> > >   other.output(something) // to write to other output collection
> > > }
> > >
> > > I've deliberately left out the undecided syntax for side outputs. But
> it
> > > would be nice for the tag to be built in to the parameter so it doesn't
> > > have to be used when calling output().
> > >
> > > One way to enhance this to deterministic input would just be this:
> > >
> > > @ProcessElement
> > > @RequiresDeterministicInput
> > > public void process(Element element, MainOutput mainOutput, ...) {
> > >   element.get() // to access the current input element
> > >   mainOutput.output(something) // to write to the default output
> > collection
> > >   other.output(something) // to write to other output collection
> > > }
> > >
> > > There are really a lot of places where we could put an annotation or
> > change
> > > a type to indicate that the input PCollection should be
> > > well-defined/deterministically-replayable. I don't have a really
> strong
> > > opinion.
> > >
> > > Kenn
> > >
> > > On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> >  > > >
> > > wrote:
> > >
> > > > Allowing an annotation on DoFn's that produce deterministic output
> > could
> > > be
> > > > added in the future, but doesn't seem like a great option.
> > > >
> > > > 1. It is a correctness issue to assume a DoFn is deterministic and be
> > > > wrong, so we would need to assume all transform outputs are
> > > > non-deterministic unless annotated. Getting this correct is difficult
> > > (for
> > > > example, GBK is surprisingly 

Re: Style of messages for checkArgument/checkNotNull in IOs

2017-07-28 Thread Thomas Groh
I'm in favor of the wording in the style of the first: it's an immediately
actionable message that will have an associated stack trace, but will
provide the parameter in plaintext so the caller doesn't have to dig
through the invoked code, they can just look at the documentation.

I've recently been convinced that all input validation should go through
`checkArgument` (including for nulls) rather than 'checkNotNull', due to
the type of exception thrown, so I'd usually prefer using that as the
`Preconditions` method. Beyond that, +1

On Fri, Jul 28, 2017 at 11:17 AM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Hey all,
>
> I think this has been discussed before on a JIRA issue but I can't find it,
> so raising again on the mailing list.
>
> Various IO (and non-IO) transforms validate their builder parameters using
> Preconditions.checkArgument/checkNotNull, and use different styles for
> error messages. There are 2 major styles:
>
> 1) style such as:
> checkNotNull(username, "username"), or checkArgument(username != null,
> "username can not be null") or checkArgument(username != null,
> "username must be set");
> checkArgument(batchSize > 0, "batchSize must be non-negative, but was: %s",
> batchSize)
>
> 2) style such as:
>   checkArgument(
>username != null,
>"ConnectionConfiguration.create().withBasicCredentials(
> username,
> password) "
>+ "called with null username");
>checkArgument(
>!username.isEmpty(),
>"ConnectionConfiguration.create().withBasicCredentials(
> username,
> password) "
>+ "called with empty username");
>
> Style 2 is recommended by the PTransform Style Guide
> https://beam.apache.org/contribute/ptransform-style-guide/#transform-
> configuration-errors
>
> However:
> 1) The usage of these two styles is not consistent - both are used in about
> the same amounts in Beam IOs.
> 2) Style 2 seems unnecessarily verbose to me. The exception thrown from a
> checkArgument or checkNotNull already includes the method being called into
> the stack trace, so I don't think the message needs to include the method.
> 3) Beam is not the first Java project to have validation of configuration
> parameters of something or another, and I don't think I've seen something
> as verbose as style 2 used anywhere else in my experience of writing Java.
>
> What do people think about changing the guidance in favor of style 1?
>
> Specifically change the following example:
>
> public Twiddle withMoo(int moo) {
>   checkArgument(moo >= 0 && moo < 100,
>   "Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
>   + "Valid values are 0 (inclusive) to 100 (exclusive)",
>   moo);
>   return toBuilder().setMoo(moo).build();}
>
> into the following:
>
> public Twiddle withMoo(int moo) {
>   checkArgument(moo >= 0 && moo < 100,
>   "Valid values for moo are 0 (inclusive) to 100 (exclusive), "
>   + "but was: %s",
>   moo);
>   return toBuilder().setMoo(moo).build();}
>
>
> And in simpler cases such as non-null checks:
> public Twiddle withUsername(String username) {
>   checkNotNull(username, "username");
>   checkArgument(!username.isEmpty(), "username can not be empty");
>   ...
> }
>


Re: Requiring PTransform to set a coder on its resulting collections

2017-07-27 Thread Thomas Groh
I think there's a simpler solution than encoding to byte[]: introduce a
new, specialized type to represent the restricted
(alternatively-distributed?) data. The TypeDescriptor for this type can map
to the specialized coder, without having to perform a significant degree of
potentially wasted encoding work, plus it includes the assumptions that are
being made about the distribution of data.

On Thu, Jul 27, 2017 at 11:04 AM, Reuven Lax <re...@google.com.invalid>
wrote:

> I tend to agree with Robert - it would be unfortunate if a single
> TypeDescrictor was forced to have the same encoding all through the
> pipeline. However it's also unfortunate if this flexibility impacted every
> part of the programming model. I also think that our experience has been
> that "large scale where tiny incremental
> optimization represents a lot of cost" is far more common than people
> expect, especially since coding/decoding is often a dominant cost for such
> pipelines.
>
> On Thu, Jul 27, 2017 at 11:00 AM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > +1 on getting rid of setCoder; just from a Java SDK perspective, my ideal
> > world contains PCollections which don't have a user-visible way to mutate
> > them.
> >
> > My preference would be to use TypeDescriptors everywhere within Pipeline
> > construction (where possible), and utilize the CoderRegistry everywhere
> to
> > actually extract the appropriate type. The unfortunate difficulty of
> having
> > to encode a union type and the lack of variable-length generics does
> > complicate that. We could consider some way of constructing coders in the
> > registry from a collection of type descriptors (which should be
> accessible
> > from the point the union-type is being constructed), e.g. something like
> > `getCoder(TypeDescriptor output, TypeDescriptor... components)` - that
> does
> > only permit a single flat level (but since this is being invoked by the
> SDK
> > during construction it could also pass Coder...).
> >
> >
> >
> > On Thu, Jul 27, 2017 at 10:22 AM, Robert Bradshaw <
> > rober...@google.com.invalid> wrote:
> >
> > > On Thu, Jul 27, 2017 at 10:04 AM, Kenneth Knowles
> > > <k...@google.com.invalid> wrote:
> > > > On Thu, Jul 27, 2017 at 2:22 AM, Lukasz Cwik
> <lc...@google.com.invalid
> > >
> > > > wrote:
> > > >>
> > > >> Ken/Robert, I believe users will want the ability to set the output
> > > coder
> > > >> because coders may have intrinsic properties where the type isn't
> > enough
> > > >> information to fully specify what I want as a user. Some cases I can
> > see
> > > >> are:
> > > >> 1) I have a cheap and fast non-deterministic coder but a different
> > > slower
> > > >> coder when I want to use it as the key to a GBK, For example with a
> > set
> > > >> coder, it would need to consistently order the values of the set
> when
> > > used
> > > >> as the key.
> > > >> 2) I know a property of the data which allows me to have a cheaper
> > > >> encoding. Imagine I know that all the strings have a common prefix
> or
> > > >> integers that are in a certain range, or that a matrix is
> > sparse/dense.
> > > Not
> > > >> all PCollections of strings / integers / matrices in the pipeline
> will
> > > have
> > > >> this property, just some.
> > > >> 3) Sorting comes up occasionally, traditionally in Google this was
> > done
> > > by
> > > >> sorting the encoded version of the object lexicographically during a
> > > GBK.
> > > >> There are good lexicographical byte representations for ASCII
> strings,
> > > >> integers, and for some IEEE number representations which could be
> done
> > > by
> > > >> the use of a special coder.
> > > >>
> > > >
> > > > Items (1) and (3) do not require special knowledge from the user.
> They
> > > are
> > > > easily observed properties of a pipeline. My proposal included full
> > > > automation for both. The suggestion is new methods
> > > > .getDeterministicCoder(TypeDescriptor) and
> > > > .getLexicographicCoder(TypeDescriptor).
> > >
> > > Completely agree--usecases (1) and (3) are an indirect use of Coders
> > > that are used to achieve an effect that would be better expressed
> > > directly.
> > >
> > > > (2)

Re: Requiring PTransform to set a coder on its resulting collections

2017-07-27 Thread Thomas Groh
+1 on getting rid of setCoder; just from a Java SDK perspective, my ideal
world contains PCollections which don't have a user-visible way to mutate
them.

My preference would be to use TypeDescriptors everywhere within Pipeline
construction (where possible), and utilize the CoderRegistry everywhere to
actually extract the appropriate type. The unfortunate difficulty of having
to encode a union type and the lack of variable-length generics does
complicate that. We could consider some way of constructing coders in the
registry from a collection of type descriptors (which should be accessible
from the point the union-type is being constructed), e.g. something like
`getCoder(TypeDescriptor output, TypeDescriptor... components)` - that does
only permit a single flat level (but since this is being invoked by the SDK
during construction it could also pass Coder...).



On Thu, Jul 27, 2017 at 10:22 AM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> On Thu, Jul 27, 2017 at 10:04 AM, Kenneth Knowles
>  wrote:
> > On Thu, Jul 27, 2017 at 2:22 AM, Lukasz Cwik 
> > wrote:
> >>
> >> Ken/Robert, I believe users will want the ability to set the output
> coder
> >> because coders may have intrinsic properties where the type isn't enough
> >> information to fully specify what I want as a user. Some cases I can see
> >> are:
> >> 1) I have a cheap and fast non-deterministic coder but a different
> slower
> >> coder when I want to use it as the key to a GBK, For example with a set
> >> coder, it would need to consistently order the values of the set when
> used
> >> as the key.
> >> 2) I know a property of the data which allows me to have a cheaper
> >> encoding. Imagine I know that all the strings have a common prefix or
> >> integers that are in a certain range, or that a matrix is sparse/dense.
> Not
> >> all PCollections of strings / integers / matrices in the pipeline will
> have
> >> this property, just some.
> >> 3) Sorting comes up occasionally, traditionally in Google this was done
> by
> >> sorting the encoded version of the object lexicographically during a
> GBK.
> >> There are good lexicographical byte representations for ASCII strings,
> >> integers, and for some IEEE number representations which could be done
> by
> >> the use of a special coder.
> >>
> >
> > Items (1) and (3) do not require special knowledge from the user. They
> are
> > easily observed properties of a pipeline. My proposal included full
> > automation for both. The suggestion is new methods
> > .getDeterministicCoder(TypeDescriptor) and
> > .getLexicographicCoder(TypeDescriptor).
>
> Completely agree--usecases (1) and (3) are an indirect use of Coders
> that are used to achieve an effect that would be better expressed
> directly.
>
> > (2) is an interesting hypothetical for massive scale where tiny
> incremental
> > optimization represents a lot of cost _and_ your data has sufficient
> > structure to realize a benefit _and_ it needs to be pinpointed to just
> some
> > PCollections. I think our experience with coders so far is that their
> > existence is almost entirely negative. It would be nice to support this
> > vanishingly rare case without inflicting a terrible pain point on the
> model
> > and all other users.
>
> (2) is not just about cheapness, sometimes there's other structure in
> the data we can leverage. Consider the UnionCoder used in
> CoGBK--RawUnionValue has an integer value that specifies indicates the
> type of it's raw Object field. Unless we want to extend the type
> language, there's not a sufficient type descriptor that can be used to
> infer the coder. I'm dubious going down the road of adding special
> cases is the right thing here.
>
> > For example, in those cases you could encode in your
> > DoFn so the type descriptor would just be byte[].
>
> As well as being an extremely cumbersome API, this would incur the
> cost of coding/decoding at that DoFn boundary even if it is fused
> away.
>
> >> On Thu, Jul 27, 2017 at 1:34 AM, Jean-Baptiste Onofré 
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > That's an interesting thread and I was wondering the relationship
> between
> >> > type descriptor and coder for a while ;)
> >> >
> >> > Today, in a PCollection, we can set the coder and we also have a
> >> > getTypeDescriptor(). It sounds weird to me: it should be one or the
> >> other.
> >> >
> >> > Basically, if the Coder is not used to define the type, than, I fully
> >> > agree with Eugene.
> >> >
> >> > Basically, the PCollection should define only the type descriptor, not
> >> the
> >> > coder by itself: the coder can be found using the type descriptor.
> >> >
> >> > With both coder and type descriptor on the PCollection, it sounds a
> big
> >> > "decoupled" to me and it would be possible to have a coder on the
> >> > PCollection that doesn't match the type descriptor.
> >> >
> >> > I think PCollection type descriptor should be defined, and the 

Re: Passing pipeline options into PTransforms and Filesystems in Python

2017-07-11 Thread Thomas Groh
We'd like to avoid giving PTransforms access to the pipeline options during
pipeline construction. There are a few compelling reasons for doing so. The
biggest one is that the context in which the pipeline is constructed and
the context in which it executes may not be the same.

As an example, if I am executing a pipeline on some remote execution
engine, I may want to to operate with more limited (or broader) permissions
than I have available locally. Those permissions should be determined at
the time the pipeline is executed, as attempting to make decisions based on
the construction time environment may prevent the pipeline from performing
correctly.

Credentials are an especially thorny issue, as some pipelines might want to
have steps that execute with different credentials (to make sure the
permissions for reading and writing are minimally scoped, for example).

Transforms that require objects or configuration that is passed via
pipeline options should generally take the argument they require explicitly
rather than by some invisible channel. We had this capability prior to
version 2.0, and removed the ability to access those options in favor of
explicit configuration.

Access to pipeline options at execution time is usually reasonable, though
if there's a way of passing more explicit configuration it would likely be
preferred. Most of these objects under the discussion should be
instantiated during execution with the appropriate pipeline options as an
argument, or have it available as a context parameter when invoked.

On Tue, Jul 11, 2017 at 3:03 PM, Dmitry Demeshchuk 
wrote:

> Hi folks,
>
> Sometimes, it would be very useful if PTransforms had access to global
> pipeline options, such as various credentials, settings and so on.
>
> Per conversation in https://issues.apache.org/jira/browse/BEAM-2572, I'd
> like to kick off a discussion about that.
>
> This would be beneficial for at least one major use case: support for
> different cloud providers (AWS, Azure, etc) and an ability to specify each
> provider's credentials just once in the pipeline options.
>
> It looks like the trickiest part is not to make the PTransform objects have
> access to pipeline options (we could possibly just modified the
> Pipeline.apply
>  apache_beam/pipeline.py#L355>
> method), but to actually pass these options down the road, such as to DoFn
> objects and FileSystem objects.
>
> I'm still in the process of reading the code and understanding of what this
> could look like, so any input would be really appreciated.
>
> Thank you.
>
> --
> Best regards,
> Dmitry Demeshchuk.
>


Mixed-Language Pipelines

2017-07-10 Thread Thomas Groh
Hey everyone;

I've been working on a design for implementing multi-language pipelines
within the Beam SDKs (also known as mix-and-match). This kind of pipeline
lets us reuse transforms written in one language in any other language that
supports the Runner API and the Fn API. Letting us write a transform once
and run it everywhere is pretty exciting to me, so I'm pretty excited for
this.

The document is available at
https://s.apache.org/beam-mixed-language-pipelines. Comments and questions
are welcome, and I'm looking forwards to any feedback available.

Thanks,

Thomas


Re: [Proposal] Submitting pipelines to Runners in another language

2017-07-07 Thread Thomas Groh
I left a couple of comments.

I'm looking forwards to this - it's going to be a good step towards being
able to execute any pipeline on any runner.

On Thu, Jul 6, 2017 at 5:11 PM, Ahmet Altay 
wrote:

> Thank you Sourabh. I added my comments as well and +1 to Kenn.
>
> On Thu, Jul 6, 2017 at 2:21 PM, Kenneth Knowles 
> wrote:
>
> > I added a few detailed comments. I definitely think we should move
> forward
> > on this to get Python pipelines running on all our our runners, and
> > hopefully that gets us ready for any future SDKs too.
> >
> > On Wed, Jul 5, 2017 at 2:21 PM, Sourabh Bajaj <
> > sourabhba...@google.com.invalid> wrote:
> >
> > > Hi,
> > >
> > > I wanted to share a proposal for submitting pipelines from SDK X
> > > (Python/Go) to runners written in another language Y (Java) (Flink /
> > Spark
> > > / Apex) using the Runner API. Please find the doc here
> > >  > > UVoH5BsFUofZuo/edit#>
> > > .
> > >
> > > As always comments and feedback are welcome.
> > >
> > > Thanks
> > > Sourabh
> > >
> >
>


Re: MergeBot is here!

2017-07-07 Thread Thomas Groh
Super duper cool. Very exciting.

On Fri, Jul 7, 2017 at 1:40 PM, Ted Yu  wrote:

> For https://gitbox.apache.org/setup/ , after completing the first two
> steps, is there any action needed for "MFA Status" box ?
>
> Cheers
>
> On Fri, Jul 7, 2017 at 1:37 PM, Lukasz Cwik 
> wrote:
>
> > for i in range(0, inf): +1
> >
> > Note that the URL for gitbox linking is:
> > https://gitbox.apache.org/setup/ (above
> > URL was missing '/' and was giving 404)
> >
> >
> > On Fri, Jul 7, 2017 at 1:21 PM, Jason Kuster  > invalid
> > > wrote:
> >
> > > Hi Beam Community,
> > >
> > > Early on in the project, we had a number of discussions about creating
> an
> > > automated tool for merging pull requests. I’m happy to announce that
> > we’ve
> > > developed such a tool and it is ready for experimental usage in Beam!
> > >
> > > The tool, MergeBot, works in conjunction with ASF’s existing GitBox
> tool,
> > > providing numerous benefits:
> > > * Automating the merge process -- instead of many manual steps with
> > > multiple Git remotes, merging is as simple as commenting a specific
> > command
> > > in GitHub.
> > > * Automatic verification of each pull request against the latest master
> > > code before merge.
> > > * Merge queue enforces an ordering of pull requests, which ensures that
> > > pull requests that have bad interactions don’t get merged at the same
> > time.
> > > * GitBox-enabled features such as reviewers, assignees, and labels.
> > > * Enabling enhanced use of tools like reviewable.io.
> > >
> > > If you are a committer, the first step is to link your Apache and
> GitHub
> > > accounts at http://gitbox.apache.org/setup. Once the accounts are
> > linked,
> > > you should have immediate access to new GitHub features like labels,
> > > assignees, etc., as well as the ability to merge pull requests by
> simply
> > > commenting “@asfgit merge” on the pull request. MergeBot will
> communicate
> > > its status back to you via the same mechanism used already by Jenkins.
> > >
> > > This functionally is currently enabled for the “beam-site” repository
> > only.
> > > In this phase, we’d like to gather feedback and improve the user
> > experience
> > > -- so please comment back early and often. Once we are happy with the
> > > experience, we’ll deploy it on the main Beam repository, and recommend
> it
> > > for wider adoption.
> > >
> > > I’d like to give a huge thank you to the Apache Infrastructure team,
> > > especially Daniel Pono Takamori, Daniel Gruno, and Chris Thistlethwaite
> > who
> > > were instrumental in bringing this project to fruition. Additionally,
> > this
> > > could not have happened without the extensive work Davor put in to keep
> > > things moving along. Thank you Davor.
> > >
> > > Looking forward to hearing your comments and feedback. Thanks.
> > >
> > > Jason
> > >
> > > --
> > > ---
> > > Jason Kuster
> > > Apache Beam / Google Cloud Dataflow
> > >
> >
>


Re: How can I disable running Python SDK tests when testing my Java change?

2017-05-18 Thread Thomas Groh
Generally I pass "-am -amd -pl sdks/java/core" to my maven invocation. -pl
is the module to build, -am indicates to also make all modules my target
depends upon, and -amd indicates to also make all of the dependencies; so
if you're only modifying java, that should hit everything. If you're making
another module, you can specify that as the -pl target, and if you
'install' instead of 'verify' you can resume arbitrarily.

On Thu, May 18, 2017 at 4:29 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> I've noticed that when I run "mvn verify", most of the time when I look at
> the screen it's running Python tests.
>
> Indeed, the Reactor Summary says:
> ...
> [INFO] Apache Beam :: SDKs :: Python .. SUCCESS [11:56
> min]
> ...
> [INFO] Total time: 12:03 min (Wall Clock)
>
> i.e. it's clearly on the critical path. The longest other project is
> 02:17min (Runners::Spark).
>
> Are our .pom files customizable with an option to run only Java tests? (or,
> respectively, only Python tests)
>
> Thanks.
>


Re: [VOTE] First stable release: release candidate #4

2017-05-15 Thread Thomas Groh
+1

Since the last candidate, I've also run the game examples for a few hours
on the DirectRunner and all's well.

On Mon, May 15, 2017 at 9:16 AM, Lukasz Cwik 
wrote:

> +1 (binding)
>
> Pei, I filed https://issues.apache.org/jira/browse/BEAM-2283 about using a
> consistent strategy when dealing with URIs/string like paths in our APIs
> and related the bugs that you filed to it.
>
>
> On Mon, May 15, 2017 at 6:56 AM, Pei HE  wrote:
>
> > Currently, several unit tests fail in Windows OS, and the beam repo fails
> > to build. (tested in Windows 7)
> >
> > (Then, I built the jar manually in mac, and copied it to Windows OS)
> >
> > Found WordCount also doesn't work with Windows OS local files.
> >
> > Filed two jira:
> > https://issues.apache.org/jira/browse/BEAM-2298
> > https://issues.apache.org/jira/browse/BEAM-2299
> >
> >
> > On Mon, May 15, 2017 at 6:30 AM, Aljoscha Krettek 
> > wrote:
> >
> > > +1
> > >
> > > Verified signatures
> > > Ran Quickstart code (WordCound, HourlyTeamScore, UserScore) on Flink on
> > > YARN on Dataproc
> > >
> > > > On 14. May 2017, at 21:06, Chamikara Jayalath 
> > > wrote:
> > > >
> > > > +1
> > > >
> > > > Verified Python SDK examples (WordCount, BigQuery tornadoes,
> UserScore,
> > > and
> > > > HourlyTeamScore)  on Windows for DirectRunner and DataflowRunner.
> > > >
> > > > Verified all checksums and signatures.
> > > >
> > > > Thanks,
> > > > Cham
> > > >
> > > >
> > > > On Sun, May 14, 2017 at 3:15 AM Ismaël Mejía 
> > wrote:
> > > >
> > > >> +1 (non-binding)
> > > >>
> > > >> Validated signatures OK
> > > >> Run mvn clean verify -Prelease OK
> > > >> Executed Nexmark with Direct/Spark/Flink/Apex runners in local mode
> > > >> (temporally downgraded to 2.0.0 to validate the version). OK
> > > >>
> > > >> This is looking great now. As Robert said, a release to be proud of.
> > > >>
> > > >> On Sun, May 14, 2017 at 8:25 AM, Robert Bradshaw
> > > >>  wrote:
> > > >>> +1
> > > >>>
> > > >>> Verified all the checksums and signatures. (Now that both md5 and
> > sha1
> > > >>> are broken, we should probably provide sha-256 as well.)
> > > >>>
> > > >>> Spot checked the site and documentation, left comments on the PR.
> The
> > > >>> main landing page has nothing about the Beam stable release, and
> the
> > > >>> top blog entry (right in the center) mentions 0.6.0 which catches
> the
> > > >>> eye. I assume a 2.0.0 blog will be here shortly?
> > > >>>
> > > >>> Ran a couple of trivial but novel direct-runner pipelines (Python
> and
> > > >> Java).
> > > >>>
> > > >>> https://github.com/tensorflow/transform is pinning 0.6.0, so we
> > won't
> > > >>> break them (though hopefully they'll upgrade to >=2.0.0 shortly
> after
> > > >>> the release).
> > > >>>
> > > >>> The Python zipfile at
> > > >>> https://dist.apache.org/repos/dist/dev/beam/2.0.0-RC4/ is missing
> > > >>> sdks/python/apache_beam/transforms/trigger_transcripts.yaml. This
> > will
> > > >>> cause some tests to be skipped (but no failure). However, I don't
> > > >>> think it's worth cutting another release candidate for.
> > > >>>
> > > >>> Everything else is looking great. This is a release to be proud of!
> > > >>>
> > > >>> - Robert
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Sat, May 13, 2017 at 8:40 PM, Mingmin Xu 
> > > wrote:
> > >  +1
> > > 
> > >  Test beam-examples with FlinkRunner, and several cases of
> > > >> KafkaIO/JdbcIO.
> > > 
> > >  Thanks!
> > >  Mingmin
> > > 
> > >  On Sat, May 13, 2017 at 7:38 PM, Ahmet Altay
> >  > > >
> > >  wrote:
> > > 
> > > > +1
> > > >
> > > > - Tested Python wordcount with DirectRunner & DataflowRunner on
> > > > Windows/Mac/Linux, and python mobile gaming examples with
> > > DirectRunner
> > > >> &
> > > > DataflowRunner on Mac/Linux
> > > > - Verified that generated pydocs are accurate.
> > > > - Python zip file has valid metadata and contains LICENSE, NOTICE
> > and
> > > > README.
> > > >
> > > > Ahmet
> > > >
> > > > On Sat, May 13, 2017 at 1:12 AM, María García Herrero <
> > > > mari...@google.com.invalid> wrote:
> > > >
> > > >> +1 -- validated python quickstart and mobile game for
> DirectRunner
> > > >> and
> > > >> DataflowRunner on Linux (RC3) and Mac (RC4).
> > > >>
> > > >> Go Beam!
> > > >>
> > > >> On Fri, May 12, 2017 at 11:12 PM, Jean-Baptiste Onofré <
> > > >> j...@nanthrax.net>
> > > >> wrote:
> > > >>
> > > >>> +1 (binding)
> > > >>>
> > > >>> Tested on beam-samples, especially focus on HDFS support, etc.
> > > >>>
> > > >>> Thanks !
> > > >>> Regards
> > > >>> JB
> > > >>>
> > > >>>
> > > >>> On 05/13/2017 06:47 AM, Davor Bonaci wrote:
> > > >>>
> > >  Hi everyone --
> > >  After 

Re: First stable release: Acceptance criteria

2017-05-11 Thread Thomas Groh
I'm making sure the direct runner plays nice in a variety of scenarios
(primarily the game examples, at the moment. Been a couple of hours and
still going strong in streaming)

On Thu, May 11, 2017 at 3:09 PM, Dan Halperin 
wrote:

> I'm focusing on:
>
> * user reported bugs (Avro, TextIO, MongoDb)
> * the actual Apache Release criteria (licensing, dependencies, etc.)
>
> On Thu, May 11, 2017 at 3:04 PM, Lukasz Cwik 
> wrote:
>
> > I have been trying out various Python scenarios on Windows.
> >
> > On Thu, May 11, 2017 at 3:01 PM, Jason Kuster <
> > jasonkus...@google.com.invalid> wrote:
> >
> > > I'll try to get wordcount running against a Spark cluster.
> > >
> > > On Wed, May 10, 2017 at 10:32 PM, Davor Bonaci 
> wrote:
> > >
> > > > Just a quick remainder to consider to consider contributing here.
> > > >
> > > > We are now at 6 criteria -- thanks!
> > > >
> > > > On Tue, May 9, 2017 at 2:29 AM, Aljoscha Krettek <
> aljos...@apache.org>
> > > > wrote:
> > > >
> > > > > Thanks for starting this document!
> > > > >
> > > > > I added a criterion and also verified it on the current RC.
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > > On 8. May 2017, at 22:48, Davor Bonaci  wrote:
> > > > > >
> > > > > > Based on the process previously discussed [1], I've seeded the
> > > > acceptance
> > > > > > criteria document [2].
> > > > > >
> > > > > > Please consider contributing to this effort by:
> > > > > > * proposing additional acceptance criteria, and/or
> > > > > > * supporting criteria proposed by others, and/or
> > > > > > * validating a criteria.
> > > > > >
> > > > > > Please note that acceptance criteria shouldn't been too deep or
> too
> > > > broad
> > > > > > -- those are covered by automated tests and hackathon we had
> > earlier.
> > > > > This
> > > > > > should be "sanity-check"-type of criteria: simple, surface-level
> > > > things.
> > > > > >
> > > > > > If you discover issues while validating a criteria, please:
> > > > > > * file a new JIRA issue, tag it as Fix Versions: “2.0.0”, and
> > > > > > * post on the dev@ mailing list on the thread about that
> specific
> > > > > release
> > > > > > candidate.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Davor
> > > > > >
> > > > > > [1]
> > > > > > https://lists.apache.org/thread.html/
> > 37caa5a94cec1405638410857f489d
> > > > > 7cf7fa12bbe3c36e9925b2d6e2@%3Cdev.beam.apache.org%3E
> > > > > > [2]
> > > > > > https://docs.google.com/document/d/
> 1XwojJ4Mj3wSlnBO1YlBs51P8kuGyg
> > > > > YRj2lrNrqmAUvo/
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > ---
> > > Jason Kuster
> > > Apache Beam / Google Cloud Dataflow
> > >
> >
>


Re: Direct runner doesn't seem to finalize checkpoint "quickly"

2017-05-10 Thread Thomas Groh
I'm going to start with number two, because it's got an easy answer:
When performing an unbounded read, the DirectRunner will finalize a
checkpoint after it has completed a subsequent read from that split where
at least one element was read. A bounded read from an unbounded source will
never be finalized by any runner.
See
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L221

For number one:
  having a checkpoint contain unacked messages is reasonable (required).
Acking those messages when a checkpoint is finalized is also reasonable.
  Checkpoints must not track any records produced after the call to
checkpoint() that they were produced in. If they do, they will improperly
finalize messages that have not been committed. Creating a new checkpoint
whenever a reader is started or a checkpoint is taken and storing state in
them is a suitable way to ensure this.

You will likely need the reader to maintain its own view of pending unacked
messages, which finalizeCheckpoint can update (in a thread-safe manner,
guarding against the reader no longer being present). You may be able to
track these at the per-checkpoint level, where a finalized checkpoint is
removed from the collection of things that hold the watermark.


On Wed, May 10, 2017 at 8:51 AM, Jean-Baptiste Onofré 
wrote:

> Hi Beamers,
>
> I'm working on some fixes in the JmsIO and MqttIO.
>
> Those two IOs behave in a similar way on the reading side:
>
> - they consume messages from a JMS or MQTT broker
> - the "pending" messages are stored in the checkpoint mark. When a new
> message is added to the checkpoint, we compare the timestamp of the message
> with the oldest pending message timestamp. It advances the watermark: so
> the watermark is basically the timestamp of the oldest pending message.
> - when the runner calls finalize on the checkpoint, then, we ack the
> messages.
>
> Testing this with direct runner, it seems the finalize is never called on
> checkpoints. So, basically, it means that the messages are not fully
> consumed from the broker (as the ack is not sent).
>
> I tried to a fair volume of messages (100) and the checkpoint is not
> finalize.
>
> Basically, I have two questions:
> 1. what do you think about this approach: storing pending messages and
> advancing the watermark this way ?
> 2. any idea when the direct runner will call the finalize on the
> checkpoint ?
>
> Thanks !
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Pipeline termination in the unified Beam model

2017-05-10 Thread Thomas Groh
I think that generally this is actually less of a big deal than suggested,
for a pretty simple reason:

All bounded pipelines are expected to terminate when they are complete.
Almost all unbounded pipelines are expected to run until explicitly shut
down.

As a result, shutting down an unbounded pipeline when the watermark reaches
positive infinity (in a production environment) will occur in practice
extremely rarely. I'm comfortable saying that the model says that such a
pipeline should terminate, and only support this in some runners.

I'm also going to copy-paste why I think it's correct to shut down, from
earlier in the thread:

"I think it's a fair claim that a PCollection is "done" when it's watermark
reaches positive infinity, and then it's easy to claim that a Pipeline is
"done" when all of its PCollections are done. Completion is an especially
reasonable claim if we consider positive infinity to be an actual infinity
- so long as allowed lateness is a finite value, elements that arrive
whenever a watermark is at positive infinity will be "infinitely" late, and
thus can be dropped by the runner."


On Wed, May 10, 2017 at 2:26 AM, Aljoscha Krettek 
wrote:

> Hi,
> A bit of clarification, the Flink Runner does not terminate a Job when the
> timeout is reached in waitUntilFinish(Duration). When we reach the timeout
> we simply return and keep the job running. I thought that was the expected
> behaviour.
>
> Regarding job termination, I think it’s easy to change the Flink Runner to
> terminate if the watermark reaches +Inf. We would simply set running to
> false in the UnboundedSourceWrapper when the watermark reaches +Inf: [1]
>
> Best,
> Aljoscha
>
> [1] https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e3
> 78608cb5f9/runners/flink/src/main/java/org/apache/beam/
> runners/flink/translation/wrappers/streaming/io/
> UnboundedSourceWrapper.java#L428-L428  beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/
> main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/
> UnboundedSourceWrapper.java#L428-L428>
>
> > On 10. May 2017, at 10:58, Jean-Baptiste Onofré  wrote:
> >
> > OK, now I understand: you are talking about waitUntilFinish(), whereas I
> was thinking about a simple run().
> >
> > IMHO spark and flink sound like the most logic behavior for a streaming
> pipeline.
> >
> > Regards
> > JB
> >
> > On 05/10/2017 10:20 AM, Etienne Chauchot wrote:
> >> Hi everyone,
> >>
> >> I'm reopening this subject because, IMHO, it is important to unify
> pipelines
> >> termination semantics in the model. Here are the differences I have
> observed in
> >> streaming pipelines termination:
> >>
> >> - direct runner: when the output watermarks of all of its PCollections
> progress
> >> to +infinity
> >>
> >> - apex runner: when the output watermarks of all of its PCollections
> progress to
> >> +infinity
> >>
> >> - dataflow runner: when the output watermarks of all of its PCollections
> >> progress to +infinity
> >>
> >> - spark runner: streaming pipelines do not terminate unless timeout is
> set in
> >> pipelineResult.waitUntilFinish()
> >>
> >> - flink runner: streaming pipelines do not terminate unless timeout is
> set in
> >> pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout
> support PR
> >> https://github.com/apache/beam/pull/2915#pullrequestreview-37090326)
> >>
> >>
> >> => Is the direct/apex/dataflow behavior the correct "beam model"
> behavior?
> >>
> >>
> >> I know that, at least for spark (mails in this thread), there is no
> easy way to
> >> know that we're done reading a source, so it might be very difficult
> (at least
> >> for this runner) to unify toward +infinity behavior if it is chosen as
> the
> >> standard behavior.
> >>
> >> Best
> >>
> >> Etienne
> >>
> >> Le 18/04/2017 à 12:05, Etienne Chauchot a écrit :
> >>> +1 on "runners really terminate in a timely manner to easily
> programmatically
> >>> orchestrate Beam pipelines in a portable way, you do need to know
> whether
> >>> the pipeline will finish without thinking about the specific runner
> and its
> >>> options"
> >>>
> >>> As an example, in Nexmark, we have streaming mode tests, and for the
> >>> benchmark, we need all the queries to behave the same between runners
> towards
> >>> termination.
> >>>
> >>> For now, to have the consistent behavior, in this mode we need to set a
> >>> timeout (a bit random and flaky) on waitUntilFinish() for spark but
> this
> >>> timeout is not needed for direct runner.
> >>>
> >>> Etienne
> >>>
> >>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
>  Isn't this already the case? I think semantically it is an unavoidable
>  conclusion, so certainly +1 to that.
> 
>  The DirectRunner and TestDataflowRunner both have this behavior
> already.
>  I've always considered that a streaming job running forever is just
> [very]
>  suboptimal shutdown latency 

Re: Process for getting the first stable release out

2017-05-05 Thread Thomas Groh
I'm also +1 on the branch. It'll help us make sure that what we're getting
in is what we need for the FSR.

On Fri, May 5, 2017 at 12:41 PM, Dan Halperin  wrote:

> I am +1 on cutting the branch, and the sentiment that we expect the first
> pancake
> 
> will
> be not ready to serve customers.
>
> On Fri, May 5, 2017 at 11:40 AM, Kenneth Knowles 
> wrote:
>
> > On Thu, May 4, 2017 at 12:07 PM, Davor Bonaci  wrote:
> >
> > > I'd like to propose the following (tweaked) process for this special
> > > release:
> > >
> > > * Create a release branch, and start building release candidates *now*
> > > This would accelerate branch creation compared to the normal process,
> but
> > > would separate the first stable release from other development on the
> > > master branch. This yields to stability and avoids unnecessary churn.
> > >
> >
> > +1 to cutting a release branch now.
> >
> > This sounds compatible with the release process [1] to me, actually. This
> > thread seems like the dev@ thread where we "decide to release" and I
> agree
> > that we should decide to release. Certainly `master` is not ready nor is
> > the web site - there are ~29 issues as I write this though many are not
> > really significant code changes. But we should never wait until `master`
> is
> > "ready".
> >
> > We know what we want to get done, and there are no radical changes, so I
> > think that makes this the right time to branch. We can easily cherry pick
> > fixes for our burndown list to ensure we don't introduce additional
> > blockers.
> >
> > Some of the burndown list are of the form "investigate if this suspected
> > bug still repros" and a release candidate is the perfect thing to use for
> > that.
> >
> > [1] https://beam.apache.org/contribute/release-guide/#decide-to-release
> >
>


Re: Future processing time timers and final watermark

2017-05-04 Thread Thomas Groh
Generally you shouldn't need to hold the watermark. The fact that the input
watermark of the DoFn has advanced to the final watermark (i.e. positive
infinity) means that all of the windows expire. At this point, any window
with buffered data that is not finished should have its remaining elements
output. The remaining processing time timers should have nothing to do at
that point, and can be safely dropped.

On Tue, May 2, 2017 at 9:57 PM, Thomas Weise  wrote:

> Hi,
>
> While working on SDF support in the Apex runner, I see the scenario where
> processing time timers are set in the future. These never trigger, the
> topology exits with the final watermark before.
>
> What is the correct way to handle this? Should the final watermark be held
> until all processing time timers are cleared?
>
> Thanks,
> Thomas
>


Re: Congratulations Davor!

2017-05-04 Thread Thomas Groh
Congratulations!

On Thu, May 4, 2017 at 7:56 AM, Thomas Weise  wrote:

> Congrats!
>
>
> On Thu, May 4, 2017 at 7:53 AM, Sourabh Bajaj <
> sourabhba...@google.com.invalid> wrote:
>
> > Congrats!!
> > On Thu, May 4, 2017 at 7:48 AM Mingmin Xu  wrote:
> >
> > > Congratulations @Davor!
> > >
> > >
> > > > On May 4, 2017, at 7:08 AM, Amit Sela  wrote:
> > > >
> > > > Congratulations Davor!
> > > >
> > > >> On Thu, May 4, 2017, 10:02 JingsongLee 
> > wrote:
> > > >>
> > > >> Congratulations!
> > > >> --
> > > >> From:Jesse Anderson 
> > > >> Time:2017 May 4 (Thu) 21:36
> > > >> To:dev 
> > > >> Subject:Re: Congratulations Davor!
> > > >> Congrats!
> > > >>
> > > >>> On Thu, May 4, 2017, 6:20 AM Aljoscha Krettek  >
> > > wrote:
> > > >>>
> > > >>> Congrats! :-)
> > >  On 4. May 2017, at 14:34, Kenneth Knowles  >
> > > >>> wrote:
> > > 
> > >  Awesome!
> > > 
> > > > On Thu, May 4, 2017 at 1:19 AM, Ted Yu 
> > wrote:
> > > >
> > > > Congratulations, Davor!
> > > >
> > > > On Thu, May 4, 2017 at 12:45 AM, Aviem Zur  > > >>> wrote:
> > > >
> > > >> Congrats Davor! :)
> > > >>
> > > >> On Thu, May 4, 2017 at 10:42 AM Jean-Baptiste Onofré <
> > > >> j...@nanthrax.net>
> > > >> wrote:
> > > >>
> > > >>> Congrats ! Well deserved ;)
> > > >>>
> > > >>> Regards
> > > >>> JB
> > > >>>
> > >  On 05/04/2017 09:30 AM, Jason Kuster wrote:
> > >  Hi all,
> > > 
> > >  The ASF has just published a blog post[1] welcoming new
> members
> > of
> > > > the
> > >  Apache Software Foundation, and our own Davor Bonaci is among
> > > them!
> > >  Congratulations and thank you to Davor for all of your work
> for
> > > the
> > > >> Beam
> > >  community, and the ASF at large. Well deserved.
> > > 
> > >  Best,
> > > 
> > >  Jason
> > > 
> > >  [1] https://blogs.apache.org/foundation/entry/the-apache-sof
> > >  tware-foundation-welcomes
> > > 
> > >  P.S. I dug through the list to make sure I wasn't missing any
> > > other
> > > >> Beam
> > >  community members; if I have, my sincerest apologies and
> please
> > > >> recognize
> > >  them on this or a new thread.
> > > 
> > > >>>
> > > >>> --
> > > >>> Jean-Baptiste Onofré
> > > >>> jbono...@apache.org
> > > >>> http://blog.nanthrax.net
> > > >>> Talend - http://www.talend.com
> > > >>>
> > > >>
> > > >
> > > >>>
> > > >>> --
> > > >> Thanks,
> > > >>
> > > >> Jesse
> > > >>
> > > >>
> > >
> >
>


Re: How to control watermark when using BoundedSource

2017-04-28 Thread Thomas Groh
You can't directly control the watermark that a BoundedSource emits.
Windowing into FixedWindows will still work as you expect, however: your
elements will be assigned to their windows based on the time the event
occurred. Depending on the runner, triggers may be run either "when
available" or after all the work is completed, but your output data will be
as if you had a perfect watermark.

On Fri, Apr 28, 2017 at 10:09 AM, Shen Li  wrote:

> Hi,
>
> Say I want to replay a data trace of last week using fixed windows. The
> data trace is read from a file using TextIO. In order to trigger windows at
> right times, how can I control the watermark emitted by the BoundedSource?
>
> Thanks,
>
> Shen
>


Re: Can application specify how watermarks should be generated?

2017-04-25 Thread Thomas Groh
getCurrentTimestamp returns the timestamp of the current element. Both
Bounded and Unbounded Readers have this method.

For a bounded source, this is safe - the source watermark can be held to
negative infinity while elements remain in the source and advance to
infinity after all elements are read, and elements can be arbitrarily
shifted forwards in time later in the pipeline (for example, via a
"WithTimestamps" transform or a DoFn that uses "outputWithTimestamp"). It's
not safe to output elements at negative infinity when there is a watermark
that may drop elements, as is the case for unbounded sources.

On Fri, Apr 21, 2017 at 8:44 AM, Shen Li  wrote:

> Hi,
>
> A follow-up question. I found that the getWatermark() API is only available
> for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API
> with comments "By default, returns the minimum possible timestamp", which
> sounds like a watermark. Any reason for the difference in method names?
>
> Shen
>
> On Thu, Apr 20, 2017 at 11:46 PM, Shen Li  wrote:
>
> > Thanks!
> >
> > Shen
> >
> >
> > On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu  wrote:
> >
> >> In KafkaIO, it's possible to provide customized watermark function, to
> >> control how to advance current watermark. I'm not familiar with other
> >> unbounded IOs, assume they should support it as getWatermark() is
> defined
> >> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
> >>
> >> A quick example to hold watermark 10 seconds earlier than processing
> time,
> >> you can have more complex logic based on KafkaRecord content.
> >> KafkaIO.<>read()
> >>   .withWatermarkFn2(new SerializableFunction<
> KafkaRecord,
> >> Instant>() {
> >> @Override
> >> public Instant apply(KafkaRecord input) {
> >>   return new Instant().minus(Duration.standardSeconds(10));
> >> }
> >>   }
> >>
> >>
> >> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles  >
> >> wrote:
> >>
> >> > You want to use an existing source but just change the watermark
> >> tracking?
> >> > You can't do this in your pipeline right now, but you could probably
> >> easily
> >> > wrap a source and proxy every method except getWatermark, though I
> have
> >> > never tried.
> >> >
> >> > The general feature that might address this is discussed a little on
> >> > https://issues.apache.org/jira/browse/BEAM-644
> >> >
> >> > There are also related ideas in the discussions about Splittable DoFn.
> >> >
> >> > Kenn
> >> >
> >> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li  wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > Can application developers provide classes/methods to specify how to
> >> > > generate watermarks from sources, and how to aggregate watermarks
> from
> >> > > multiple input PCollections? Say, emit at most 1 watermark per
> >> second, or
> >> > > create watermarks that are 5 seconds older than the latest tuple's
> >> > > timestamp?
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Shen
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> 
> >> Mingmin
> >>
> >
> >
>


Re: [PROPOSAL] Remove KeyedCombineFn

2017-04-21 Thread Thomas Groh
A happy +1. This simplifies the code base, and if we find a compelling use,
it shouldn't be too bad to add it back in.

On Fri, Apr 21, 2017 at 10:24 AM, Kenneth Knowles 
wrote:

> Hi all,
>
> I propose that we remove KeyedCombineFn before the first stable release.
>
> I don't think it adds enough value for the complexity it adds to e.g.
> CombineWithContext [1] and state [2, 3], and it doesn't seem to me that
> users really use it when we might expect. I am happy to be demonstrated
> wrong.
>
> It is very likely that you have never written [4, 5] or thought about
> KeyedCombineFn. So for context, here are excepts from signatures just to
> show the difference from CombineFn:
>
> CombineFn {
>   AccumT createAccumulator();
>   AccumT addInput(AccumT accum, InputT input);
>   AccumT mergeAccumulators(Iterable accums);
>   OutputT extractOutput(AccumT accum);
> }
>
> KeyedCombineFn {
>   AccumT createAccumulator(K key);
>   AccumT addInput(K key, AccumT accum, InputT input);
>   AccumT mergeAccumulators(K key, Iterable accums);
>   OutputT extractOutput(K key, AccumT accum);
> }
>
> So what are the particular reasons for this, versus a CombineFn that has
> KVs as its input and accumulator types?
>
>  - There are some performance improvements potentially from not passing
> keys around, based on the assumption they are always available.
>
>  - There is also a spec difference because it only has to be associative
> and commutative per key, cannot be applied in a global combine, and
> addInput is automatically key preserving.
>
> But in fact, in all of my code crawling the class is almost never used
> (even over the course of its history at Google) and even the few uses I
> found were often mistakes where the key is totally ignored, probably
> because a user thinks "I am doing a keyed combine so I need a keyed combine
> function". So the number of users actually affected is about zero.
>
> I would be curious if anyone has a compelling case for keeping
> KeyedCombineFn.
>
> Kenn
>
> [1]
> https://github.com/yafengguo/Apache-beam/blob/master/sdks/
> java/core/src/main/java/org/apache/beam/sdk/transforms/
> CombineWithContext.java
> [2] https://issues.apache.org/jira/browse/BEAM-1336
> [3] https://github.com/apache/beam/pull/2627
> [4]
> https://github.com/search?l=Java=KeyedCombineFn=
> advsearch=Code=%E2%9C%93
> [5] https://www.google.com/search?q=KeyedCombineFn
>


Re: Renaming SideOutput

2017-04-12 Thread Thomas Groh
Cool! I've filed https://issues.apache.org/jira/browse/BEAM-1949 and
authored https://github.com/apache/beam/pull/2512 to make this change.

On Tue, Apr 11, 2017 at 11:33 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> +1
>
> > On Apr 11, 2017, at 5:34 PM, Thomas Groh <tg...@google.com.INVALID>
> wrote:
> >
> > I think that's a good idea. I would call the outputs of a ParDo the "Main
> > Output" and "Additional Outputs" - it seems like an easy way to make it
> > clear that there's one output that is always expected, and there may be
> > more.
> >
> > On Tue, Apr 11, 2017 at 5:29 PM, Robert Bradshaw <
> > rober...@google.com.invalid> wrote:
> >
> >> We should do some renaming in Python too. Right now we have
> >> SideOutputValue which I'd propose naming TaggedOutput or something
> >> like that.
> >>
> >> Should the docs change too?
> >> https://beam.apache.org/documentation/programming-
> guide/#transforms-sideio
> >>
> >> On Tue, Apr 11, 2017 at 5:25 PM, Kenneth Knowles <k...@google.com.invalid
> >
> >> wrote:
> >>> +1 ditto about sideInput and sideOutput not actually being related
> >>>
> >>> On Tue, Apr 11, 2017 at 3:52 PM, Robert Bradshaw <
> >>> rober...@google.com.invalid> wrote:
> >>>
> >>>> +1, I think this is a lot clearer.
> >>>>
> >>>> On Tue, Apr 11, 2017 at 2:24 PM, Stephen Sisk <s...@google.com.invalid
> >
> >>>> wrote:
> >>>>> strong +1 for changing the name away from sideOutput - the fact that
> >>>>> sideInput and sideOutput are not really related was definitely a
> >> source
> >>>> of
> >>>>> confusion for me when learning beam.
> >>>>>
> >>>>> S
> >>>>>
> >>>>> On Tue, Apr 11, 2017 at 1:56 PM Thomas Groh <tg...@google.com.invalid
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hey everyone:
> >>>>>>
> >>>>>> I'd like to rename DoFn.Context#sideOutput to #output (in the Java
> >> SDK).
> >>>>>>
> >>>>>> Having two methods, both named output, one which takes the "main
> >> output
> >>>>>> type" and one that takes a tag to specify the type more clearly
> >>>>>> communicates the actual behavior - sideOutput isn't a "special" way
> >> to
> >>>>>> output, it's the same as output(T), just to a specified PCollection.
> >>>> This
> >>>>>> will help pipeline authors understand the actual behavior of
> >> outputting
> >>>> to
> >>>>>> a tag, and detangle it from "sideInput", which is a special way to
> >>>> receive
> >>>>>> input. Giving them the same name means that it's not even strange to
> >>>> call
> >>>>>> output and provide the main output type, which is what we want -
> >> it's a
> >>>>>> more specific way to output, but does not have different
> >> restrictions or
> >>>>>> capabilities.
> >>>>>>
> >>>>>> This is also a pretty small change within the SDK - it touches about
> >> 20
> >>>>>> files, and the changes are pretty automatic.
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Thomas
> >>
>


Re: Renaming SideOutput

2017-04-11 Thread Thomas Groh
I think that's a good idea. I would call the outputs of a ParDo the "Main
Output" and "Additional Outputs" - it seems like an easy way to make it
clear that there's one output that is always expected, and there may be
more.

On Tue, Apr 11, 2017 at 5:29 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> We should do some renaming in Python too. Right now we have
> SideOutputValue which I'd propose naming TaggedOutput or something
> like that.
>
> Should the docs change too?
> https://beam.apache.org/documentation/programming-guide/#transforms-sideio
>
> On Tue, Apr 11, 2017 at 5:25 PM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
> > +1 ditto about sideInput and sideOutput not actually being related
> >
> > On Tue, Apr 11, 2017 at 3:52 PM, Robert Bradshaw <
> > rober...@google.com.invalid> wrote:
> >
> >> +1, I think this is a lot clearer.
> >>
> >> On Tue, Apr 11, 2017 at 2:24 PM, Stephen Sisk <s...@google.com.invalid>
> >> wrote:
> >> > strong +1 for changing the name away from sideOutput - the fact that
> >> > sideInput and sideOutput are not really related was definitely a
> source
> >> of
> >> > confusion for me when learning beam.
> >> >
> >> > S
> >> >
> >> > On Tue, Apr 11, 2017 at 1:56 PM Thomas Groh <tg...@google.com.invalid
> >
> >> > wrote:
> >> >
> >> >> Hey everyone:
> >> >>
> >> >> I'd like to rename DoFn.Context#sideOutput to #output (in the Java
> SDK).
> >> >>
> >> >> Having two methods, both named output, one which takes the "main
> output
> >> >> type" and one that takes a tag to specify the type more clearly
> >> >> communicates the actual behavior - sideOutput isn't a "special" way
> to
> >> >> output, it's the same as output(T), just to a specified PCollection.
> >> This
> >> >> will help pipeline authors understand the actual behavior of
> outputting
> >> to
> >> >> a tag, and detangle it from "sideInput", which is a special way to
> >> receive
> >> >> input. Giving them the same name means that it's not even strange to
> >> call
> >> >> output and provide the main output type, which is what we want -
> it's a
> >> >> more specific way to output, but does not have different
> restrictions or
> >> >> capabilities.
> >> >>
> >> >> This is also a pretty small change within the SDK - it touches about
> 20
> >> >> files, and the changes are pretty automatic.
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Thomas
> >> >>
> >>
>


Renaming SideOutput

2017-04-11 Thread Thomas Groh
Hey everyone:

I'd like to rename DoFn.Context#sideOutput to #output (in the Java SDK).

Having two methods, both named output, one which takes the "main output
type" and one that takes a tag to specify the type more clearly
communicates the actual behavior - sideOutput isn't a "special" way to
output, it's the same as output(T), just to a specified PCollection. This
will help pipeline authors understand the actual behavior of outputting to
a tag, and detangle it from "sideInput", which is a special way to receive
input. Giving them the same name means that it's not even strange to call
output and provide the main output type, which is what we want - it's a
more specific way to output, but does not have different restrictions or
capabilities.

This is also a pretty small change within the SDK - it touches about 20
files, and the changes are pretty automatic.

Thanks,

Thomas


Re: Combine.Global

2017-04-10 Thread Thomas Groh
This looks like it might be because the output coder cannot be determined.
It looks like the registry understands that it must build a KvCoder, but
cannot infer the coder for OutputT. More specifically, within the stack
trace, the following line occurs:

"Unable to provide a default Coder for java.lang.Object. Correct one of the
following root causes:"

CombineFn provides a `getDefaultOutputCoder(CoderRegistry, Coder)`
method which may be suitable here for producing the coder for your outputs.

(I can produce a very similar stack trace:
https://gist.github.com/tgroh/04d4b638e7fabf8a03187760ddb26eef)

On Fri, Apr 7, 2017 at 9:46 PM, Aviem Zur  wrote:

> I wasn't able to reproduce the issue you're experiencing.
> I've created a gist with an example that works and is similar to what you
> have described.
> Please help us make tweaks to the gist reproduce your problem:
> https://gist.github.com/aviemzur/ba213d98b4484492099b3cf709ddded0
>
> On Fri, Apr 7, 2017 at 7:25 PM Paul Gerver  wrote:
>
> > Yes, the pipeline is quite small:
> >
> > pipeline.apply("source",
> > Read.from(new CustomSource())).setCoder(
> CustomSource.coder)
> > .apply("GlobalCombine", Combine.globally(new
> > CustomCombineFn())).setCoder(CustomTuple.coder);
> >
> >
> > The InputT is not the same as OutputT, so the input coder can't be used.
> >
> > On 2017-04-07 08:58 (-0500), Aviem Zur  wrote:
> > > Have you set the coder for your input PCollection? The one on which you
> > > perform the Combine?
> > >
> > > On Fri, Apr 7, 2017 at 4:24 PM Paul Gerver  wrote:
> > >
> > > > Hello All,
> > > >
> > > > I'm trying to test out a Combine.Globally transform which takes in a
> > small
> > > > custom class (CustomA) and outputs a secondary custom class
> (CustomB).
> > I
> > > > have set the coder for the resulting PCollection, but Beam
> is
> > > > arguing that a coder for a KV type is missing (see output at bottom).
> > > >
> > > > Since this a global combine, the input nor the output is of KV type,
> > so I
> > > > decided to take a look at the Combine code. Since
> > Combine.Globally.expand()
> > > > performs a perKeys and groupedValues underneath the covers, but
> > requires
> > > > making an intermediate PCollection KV which--according
> > to
> > > > the docs--is inferred from the CombineFn.
> > > >
> > > > I believe I could workaround this by registering a KvCoder with the
> > > > CoderRegistry, but that's not intuitive. Is there a better way to
> > address
> > > > this currently, or should something be added to the CombineFn area
> for
> > > > setting an output coder similar to PCollection.
> > > >
> > > >
> > > > Output:
> > > > Exception in thread "main" java.lang.IllegalStateException: Unable
> to
> > > > return a default Coder for
> > > >
> > > >
> > GlobalCombine/Combine.perKey(CustomTuple)/Combine.
> GroupedValues/ParDo(Anonymous).out
> > > > [Class]. Correct one of the following root causes:
> > > >   No Coder has been manually specified;  you may do so using
> > .setCoder().
> > > >   Inferring a Coder from the CoderRegistry failed: Unable to provide
> a
> > > > default Coder for org.apache.beam.sdk.values.KV. Correct
> > one of
> > > > the following root causes:
> > > >   Building a Coder using a registered CoderFactory failed: Cannot
> > provide
> > > > coder for parameterized type org.apache.beam.sdk.values.KV OutputT>:
> > > > Unable to provide a default Coder for java.lang.Object. Correct one
> of
> > the
> > > > following root causes:
> > > >
> > > >
> > > > Stack:
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.repackaged.com.google.common.
> base.Preconditions.checkState(Preconditions.java:174)
> > > > at
> > > > org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
> > > > at
> > > > org.apache.beam.sdk.values.PCollection.getCoder(
> PCollection.java:130)
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.values.TypedPValue.finishSpecifying(
> TypedPValue.java:90)
> > > > at
> > > >
> > > >
> > org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
> TransformHierarchy.java:95)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:386)
> > > > at
> > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:302)
> > > > at
> > > > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
> > > > at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.
> expand(Combine.java:1460)
> > > > at
> > > >
> > org.apache.beam.sdk.transforms.Combine$Globally.
> expand(Combine.java:1337)
> > > > at
> > > >
> > org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
> > > > at
> > > >
> > org.apache.beam.runners.direct.DirectRunner.apply(DirectRunner.java:296)
> > > > at
> > 

Re: Spec cleanup for Finalize Checkpoint

2017-03-29 Thread Thomas Groh
(Short URL:
https://s.apache.org/FIWQ)

On Wed, Mar 29, 2017 at 1:15 PM, Thomas Groh <tg...@google.com> wrote:

> Hey everyone,
>
> We've had a few bugs recently in the DirectRunner based around finalizing
> checkpoints, as well as a bit of confusion on what should be permitted from
> within a checkpoint. Those caused some revisiting of the checkpoint spec,
> both to make sure we have written down what a runner is meant to do, and
> what that requires from a user. I've summarized as many existing problems
> as I could find and have a couple of suggested updates for the
> documentation. Most of these are already implemented for all of the runners
> and UnboundedSources, and are just to make the documentation more precise.
> Take a look if your'e interested.
>
> https://docs.google.com/document/d/1G-WcPQNLHGmgH-RKpqN4G33P
> iJMaqGFmKksES6u_aK4/edit?usp=sharing
>
> Thanks,
>
> Thomas
>


Spec cleanup for Finalize Checkpoint

2017-03-29 Thread Thomas Groh
Hey everyone,

We've had a few bugs recently in the DirectRunner based around finalizing
checkpoints, as well as a bit of confusion on what should be permitted from
within a checkpoint. Those caused some revisiting of the checkpoint spec,
both to make sure we have written down what a runner is meant to do, and
what that requires from a user. I've summarized as many existing problems
as I could find and have a couple of suggested updates for the
documentation. Most of these are already implemented for all of the runners
and UnboundedSources, and are just to make the documentation more precise.
Take a look if your'e interested.

https://docs.google.com/document/d/1G-WcPQNLHGmgH-
RKpqN4G33PiJMaqGFmKksES6u_aK4/edit?usp=sharing

Thanks,

Thomas


Re: [PROPOSAL] @OnWindowExpiration

2017-03-29 Thread Thomas Groh
+1

The fact that we have this ability already (including all of the required
information), just in a roundabout way by manually dredging in the allowed
lateness, means that this isn't a huge burden to implement on an SDK or
runner side; meanwhile, this much more strongly communicates what a user is
trying to accomplish (in the general case, flush anything left over).

I think having this annotation present and available also makes it more
obvious that if there's no window-expiration cleanup then any remaining
buffered state will be lost, and that there's a recommended way to flush
any remaining state.

On Wed, Mar 29, 2017 at 9:14 AM, Kenneth Knowles 
wrote:

> On Wed, Mar 29, 2017 at 12:16 AM, JingsongLee 
> wrote:
>
> > If user have a WordCount StatefulDoFn, the result of
> > counts is always changing before the expiration of window.
> > Maybe the user want a signal to know the count is the final value
> > and then archive the value to the timing database or somewhere else.
> > best,
> > JingsongLee
> >
>
> This is a good point to bring up, but actually already required to be
> handled by the runner. This issue exists with timers already. The runner
> must sequence these:
>
> 1. Expire the window and start dropping any more input
> 2. Fire the user's expiration callback
> 3. Delete the state for the window
>
> This actually made me think of a special property of @OnWindowExpiration:
> we can forbid Timer parameters. If we followed Robert's idea we could do
> static analysis and enforce the same thing.
>
> This is a pretty good motivation for the special feature. It is more than
> convenience.
>
> Kenn
>
>
> > 
> --From:Kenneth
> > Knowles Time:2017 Mar 29 (Wed) 09:07To:dev <
> > dev@beam.apache.org>Subject:Re: [PROPOSAL] @OnWindowExpiration
> > On Tue, Mar 28, 2017 at 2:47 PM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > > Kenn, can you quote some use cases for this, to make
> > it more clear what are
> > > the consequences of having this API in this form?
> > >
> > > I recall that one of the main use cases was batching DoFn, right?
> > >
> >
> > I believe every stateful DoFn where the data stored in state represents
> > some accumulation of the input and/or buffering of output requires this.
> > So, yes:
> >
> >  - batching DoFn and the many variants that may spring up
> >  - combine-like stateful DoFns that require state, like blended
> > accumulation modes or selective composed combines
> >  - trigger-like stateful DoFns that output based on some complex
> > user-defined criteria
> >
> > The stateful DoFns that do not require such a timer are those where the
> > stored data is a phase transition or side-input-like enrichment, and I
> > think also common join algorithms.
> >
> > I don't have a sense of which of these will be more prevalent. Both
> > categories represent common user needs.
> >
> > Kenn
> >
> >
> > > On Tue, Mar 28, 2017 at 1:37 PM Kenneth Knowles  >
> > > wrote:
> > >
> > > > On Tue, Mar 28, 2017 at 1:32 PM, Robert Bradshaw <
> > > > rober...@google.com.invalid> wrote:
> > > >
> > > > > Another alternative is to be able to set special timers, e.g. end
> of
> > > > window
> > > > > and expiration of window. That at least addresses (2).
> > > > >
> > > >
> > > > Potentially a tangent, but that would perhaps fit in with the idea of
> > > > removing TimeDomain from user APIs (
> > > > https://issues.apache.org/jira/browse/BEAM-1308) and instead having
> > > > TimerSpecs.eventTimeTimer(), TimerSpecs.processingTimeTimer(),
> > > > TimerSpecs.windowExpirationTimer() that each yield distinct sorts of
> > > > parameters in @ProcessElement.
> > > >
> > > > A bit more heavyweight, syntactically.
> > > >
> > > > Kenn
> > > >
> > > >
> > > > >
> > > > > On Tue, Mar 28, 2017 at 1:27 PM, Kenneth Knowles
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I have a little extension to the stateful DoFn annotations to
> > > circulate
> > > > > for
> > > > > > feedback: Allow a method to be annotated with @
> > OnWindowExpiration to
> > > > > > automatically get a callback at some point after the window has
> > > > expired,
> > > > > > but before the state for the window has been cleared.
> > > > > >
> > > > > > Today, a user can pretty easily get the same effect by setting a
> > > timer
> > > > > for
> > > > > > the end of the window + allowed lateness in their @ProcessElement
> > > > calls.
> > > > > > But having just one annotation for it has a couple nice benefits:
> > > > > >
> > > > > > 1. Some users assume a naive implementation so they are concerned
> > > that
> > > > > > setting a timer repeatedly is costly. This
> > eliminates the cause for
> > > > user
> > > > > > alarm and allows a runner to do a better job in case it didn't
> > > already
> > > > 

Side-Channel Inputs and an SDK

2017-03-24 Thread Thomas Groh
Hey everyone;

I have a quick one-pager on PTransform capabilities and the ability for a
PTransform to receive inputs via a Side-channel. This is a low-impact
change to any existing user and runner author, but since it hits every
PTransform and is required for the runner API, I thought you all might be
interested.

Thanks,

Thomas

https://docs.google.com/document/d/1e_-MenoW2cQ-6-EGVVqfOR-B9FovVXqXyUm4-ZwlgKA/edit?usp=sharing


Re: [ANNOUNCEMENT] New committers, March 2017 edition!

2017-03-17 Thread Thomas Groh
Well done, congratulations, and welcome, everyone!

On Fri, Mar 17, 2017 at 2:13 PM, Davor Bonaci  wrote:

> Please join me and the rest of Beam PMC in welcoming the following
> contributors as our newest committers. They have significantly contributed
> to the project in different ways, and we look forward to many more
> contributions in the future.
>
> * Chamikara Jayalath
> Chamikara has been contributing to Beam since inception, and previously to
> Google Cloud Dataflow, accumulating a total of 51 commits (8,301 ++ / 3,892
> --) since February 2016 [1]. He contributed broadly to the project, but
> most significantly to the Python SDK, building the IO framework in this SDK
> [2], [3].
>
> * Eugene Kirpichov
> Eugene has been contributing to Beam since inception, and previously to
> Google Cloud Dataflow, accumulating a total of 95 commits (22,122 ++ /
> 18,407 --) since February 2016 [1]. In recent months, he’s been driving the
> Splittable DoFn effort [4]. A true expert on IO subsystem, Eugene has
> reviewed nearly every IO contributed to Beam. Finally, Eugene contributed
> the Beam Style Guide, and is championing it across the project.
>
> * Ismaël Mejia
> Ismaël has been contributing to Beam since mid-2016, accumulating a total
> of 35 commits (3,137 ++ / 1,328 --) [1]. He authored the HBaseIO connector,
> helped on the Spark runner, and contributed in other areas as well,
> including cross-project collaboration with Apache Zeppelin. Ismaël reported
> 24 Jira issues.
>
> * Aviem Zur
> Aviem has been contributing to Beam since early fall, accumulating a total
> of 49 commits (6,471 ++ / 3,185 --) [1]. He reported 43 Jira issues, and
> resolved ~30 issues. Aviem improved the stability of the Spark runner a
> lot, and introduced support for metrics. Finally, Aviem is championing
> dependency management across the project.
>
> Congratulations to all four! Welcome!
>
> Davor
>
> [1]
> https://github.com/apache/beam/graphs/contributors?from=
> 2016-02-01=2017-03-17=c
> [2]
> https://github.com/apache/beam/blob/v0.6.0/sdks/python/
> apache_beam/io/iobase.py#L70
> [3]
> https://github.com/apache/beam/blob/v0.6.0/sdks/python/
> apache_beam/io/iobase.py#L561
> [4] https://s.apache.org/splittable-do-fn
>


Re: Pipeline termination in the unified Beam model

2017-03-01 Thread Thomas Groh
+1

I think it's a fair claim that a PCollection is "done" when it's watermark
reaches positive infinity, and then it's easy to claim that a Pipeline is
"done" when all of its PCollections are done. Completion is an especially
reasonable claim if we consider positive infinity to be an actual infinity
- so long as allowed lateness is a finite value, elements that arrive
whenever a watermark is at positive infinity will be "infinitely" late, and
thus can be dropped by the runner.

As an aside, this is only about "finishing because the pipeline is
complete" - it's unrelated to "finished because of an unrecoverable error"
or similar reasons pipelines can stop running, yes?

On Wed, Mar 1, 2017 at 5:54 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Raising this onto the mailing list from
> https://issues.apache.org/jira/browse/BEAM-849
>
> The issue came up: what does it mean for a pipeline to finish, in the Beam
> model?
>
> Note that I am deliberately not talking about "batch" and "streaming"
> pipelines, because this distinction does not exist in the model. Several
> runners have batch/streaming *modes*, which implement the same semantics
> (potentially different subsets: in batch mode typically a runner will
> reject pipelines that have at least one unbounded PCollection) but in an
> operationally different way. However we should define pipeline termination
> at the level of the unified model, and then make sure that all runners in
> all modes implement that properly.
>
> One natural way is to say "a pipeline terminates when the output watermarks
> of all of its PCollection's progress to +infinity". (Note: this can be
> generalized, I guess, to having partial executions of a pipeline: if you're
> interested in the full contents of only some collections, then you wait
> until only the watermarks of those collections progress to infinity)
>
> A typical "batch" runner mode does not implement watermarks - we can think
> of it as assigning watermark -infinity to an output of a transform that
> hasn't started executing yet, and +infinity to output of a transform that
> has finished executing. This is consistent with how such runners implement
> termination in practice.
>
> Dataflow streaming runner additionally implements such termination for
> pipeline drain operation: it has 2 parts: 1) stop consuming input from the
> sources, and 2) wait until all watermarks progress to infinity.
>
> Let us fill the gap by making this part of the Beam model and declaring
> that all runners should implement this behavior. This will give nice
> properties, e.g.:
> - A pipeline that has only bounded collections can be run by any runner in
> any mode, with the same results and termination behavior (this is actually
> my motivating example for raising this issue is: I was running Splittable
> DoFn tests
>  core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
> with the streaming Dataflow runner - these tests produce only bounded
> collections - and noticed that they wouldn't terminate even though all data
> was processed)
> - It will be possible to implement pipelines that stream data for a while
> and then eventually successfully terminate based on some condition. E.g. a
> pipeline that watches a continuously growing file until it is marked
> read-only, or a pipeline that reads a Kafka topic partition until it
> receives a "poison pill" message. This seems handy.
>


Re: [DISCUSS] Per-Key Watermark Maintenance

2017-02-28 Thread Thomas Groh
I think that a per-key watermark is not just consistent with the model, but
also there's an argument to be made that it is the correct way to conceive
of watermarks in Beam. The way we currently hold watermarks inside of
ReduceFnRunner is via a WatermarkHold, which is set per-key. As a result,
the downstream watermarks for any key can be meaningfully understood from
the upstream watermark + the hold of only the key. When elements are
rekeyed this gets much more complicated, of course - hence, I imagine, the
concepts of a "key lineage" or merely key-preserving transforms.

I think as well it's reasonable to claim that in the absence of more
information the output watermark of a key-unaware producer is the minimum
of the union of all of its input watermarks + holds (effectively what the
current watermark representations use) - but it's possible for downstream
steps to gain information (e.g. the input watermark for a GroupByKey is
positive infinity, the output watermark per-key can advance after those
elements are output, independently of other keys).

On Tue, Feb 28, 2017 at 1:58 PM, Ben Chambers 
wrote:

> Following up on what Kenn said, it seems like the idea of a per-key
> watermark is logically consistent with Beam model. Each runner already
> chooses how to approximate the model-level concept of the per-key
> watermark.
>
> The list Kenn had could be extended with things ilke:
> 4. A runner that assumes the watermark for all keys is at -\infty until all
> data is processed and then jumps to \infty. This wolud be similar to a
> Batch runner, for instance.
>
> How watermarks are tracked may even depend on the pipeline -- if there is
> no shuffling of data between keys it may be easier to support a per-key
> watermark.
>
> On Tue, Feb 28, 2017 at 1:50 PM Kenneth Knowles 
> wrote:
>
> > This is a really interesting topic. I think Beam is a good place to have
> a
> > broader cross-runner discussion of this.
> >
> > I know that you are not the only person skeptical due to
> > trickiness/costliness of implementation.
> >
> > On the other hand, at a semantic level, I think any correct definition
> will
> > allow a per-PCollection watermark to serve as just a "very bad" per-key
> > watermark. So I would target a definition whereby any of these is
> > acceptable:
> >
> > 1. a runner with no awareness of per-key watermarks
> > 2. a runner that chooses only sometimes to use them (there might be some
> > user-facing cost/latency tradeoff a la triggers here)
> > 3. a runner that chooses a coarse approximation for tracking key lineage
> >
> > Can you come up with an example where this is impossible?
> >
> > Developing advanced model features whether or not all runners do (or can)
> > support them is exactly the spirit of Beam, to me, so I am really glad
> you
> > brought this up here.
> >
> > On Mon, Feb 27, 2017 at 5:59 AM, Aljoscha Krettek 
> > wrote:
> >
> > > We recently started a discussion on the Flink ML about this topic: [1]
> > >
> > > The gist of it is that for some use cases tracking the watermark
> per-key
> > > instead of globally (or rather per partition) can be useful for some
> > cases.
> > > Think, for example, of tracking some user data off mobile phones where
> > the
> > > user-id/phone number is the key. For these cases, one slow key, i.e. a
> > > disconnected phone, would slow down the watermark.
> > >
> > > I'm not saying that this is a good idea, I'm actually skeptical because
> > the
> > > implementation seems quite complicated/costly to me and I have some
> > doubts
> > > about being able to track the watermark per-key in the sources. It's
> just
> > > that more people seem to be asking about this lately and I would like
> to
> > > have a discussion with the clever Beam people because we claim to be at
> > the
> > > forefront of parallel data processing APIs. :-)
> > >
> > > Also note that I'm aware that in the example given above it would be
> > > difficult to find out if one key is slow in the first place and
> therefore
> > > hold the watermark for that key.
> > >
> > > If you're interested, please have a look at the discussion on the Flink
> > ML
> > > that I linked to above. It's only 4 mails so far and Jamie gives a
> nicer
> > > explanation of the possible use cases than I'm doing here. Note, that
> my
> > > discussion of feasibility and APIs/key lineage should also apply to
> Beam.
> > >
> > > What do you think?
> > >
> > > [1]
> > > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef
> > > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
> > >
> >
>


Re: We've hit 2000 PRs!

2017-02-16 Thread Thomas Groh
Impressive work everyone. Very cool.

On Thu, Feb 16, 2017 at 8:05 AM, Dan Halperin 
wrote:

> Checking my previous claims:
>
> PR #1: Feb 26, 2016
> PR #1000: Sep 24, 2016 (211 days later)
> PR #2000: Feb 13, 2016 (142 days later) Yep -- much quicker!
>
> I'm excited to see this community growing and innovating as we march
> towards the true Beam Technical Vision, a first major release, and really
> empowering users to build portable, long-lived, fast data processing
> pipelines.
>
> Thanks everyone for making this community and keeping this project really
> fun :)
>
> Dan
>
> On Mon, Sep 26, 2016 at 2:47 PM, Dan Halperin  wrote:
>
> > Hey folks!
> >
> > Just wanted to send out a note -- we've hit 1000 PRs in GitHub as of
> > Saturday! That's a tremendous amount of work for the 7 months since PR#1.
> >
> > I bet we hit 2000 in much fewer than 7 months ;)
> >
> > Dan
> >
>


Re: [ANNOUNCEMENT] New committers, January 2017 edition!

2017-01-27 Thread Thomas Groh
Congratulations all!

On Fri, Jan 27, 2017 at 9:34 AM, Chamikara Jayalath 
wrote:

> Congrats all !! :)
>
> - Cham
>
> On Fri, Jan 27, 2017 at 4:13 AM Stas Levin  wrote:
>
> > Thanks all, glad to be joining!
> >
> > On Fri, Jan 27, 2017, 13:07 Aljoscha Krettek 
> wrote:
> >
> > > Welcome aboard! :-)
> > >
> > > On Fri, 27 Jan 2017 at 11:27 Ismaël Mejía  wrote:
> > >
> > > > Congratulations, well deserved guys !
> > > >
> > > >
> > > > On Fri, Jan 27, 2017 at 9:28 AM, Amit Sela 
> > wrote:
> > > >
> > > > > Welcome and congratulations to all!
> > > > >
> > > > > On Fri, Jan 27, 2017, 10:12 Ahmet Altay 
> > > > wrote:
> > > > >
> > > > > > Thank you all! And congratulations to other new committers.
> > > > > >
> > > > > > Ahmet
> > > > > >
> > > > > > On Thu, Jan 26, 2017 at 9:45 PM, Kobi Salant <
> > kobi.sal...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Congrats! Well deserved Stas
> > > > > > >
> > > > > > > בתאריך 27 בינו' 2017 7:26,‏ "Frances Perry" <
> fran...@apache.org>
> > > > כתב:
> > > > > > >
> > > > > > > > Woohoo! Congrats ;-)
> > > > > > > >
> > > > > > > > On Thu, Jan 26, 2017 at 9:05 PM, Jean-Baptiste Onofré <
> > > > > j...@nanthrax.net
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Welcome aboard !⁣
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > JB
> > > > > > > > >
> > > > > > > > > On Jan 27, 2017, 01:27, at 01:27, Davor Bonaci <
> > > da...@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > > > >Please join me and the rest of Beam PMC in welcoming the
> > > > following
> > > > > > > > > >contributors as our newest committers. They have
> > significantly
> > > > > > > > > >contributed
> > > > > > > > > >to the project in different ways, and we look forward to
> > many
> > > > more
> > > > > > > > > >contributions in the future.
> > > > > > > > > >
> > > > > > > > > >* Stas Levin
> > > > > > > > > >Stas has contributed across the breadth of the project,
> from
> > > the
> > > > > > Spark
> > > > > > > > > >runner to the core pieces and Java SDK. Looking at code
> > > > > > contributions
> > > > > > > > > >alone, he authored 43 commits and reported 25 issues. Stas
> > is
> > > > very
> > > > > > > > > >active
> > > > > > > > > >on the mailing lists too, contributing to good discussions
> > and
> > > > > > > > > >proposing
> > > > > > > > > >improvements to the Beam model.
> > > > > > > > > >
> > > > > > > > > >* Ahmet Altay
> > > > > > > > > >Ahmet is a major contributor to the Python SDK, both in
> > terms
> > > of
> > > > > > > design
> > > > > > > > > >and
> > > > > > > > > >code contribution. Looking at code contributions alone, he
> > > > > authored
> > > > > > 98
> > > > > > > > > >commits and reviewed dozens of pull requests. With Python
> > > SDK’s
> > > > > > > > > >imminent
> > > > > > > > > >merge to the master branch, Ahmet contributed towards
> > > > > establishing a
> > > > > > > > > >new
> > > > > > > > > >major component in Beam.
> > > > > > > > > >
> > > > > > > > > >* Pei He
> > > > > > > > > >Pei has been contributing to Beam since its inception,
> > > > > accumulating
> > > > > > a
> > > > > > > > > >total
> > > > > > > > > >of 118 commits since February. He has made several major
> > > > > > > contributions,
> > > > > > > > > >most recently by redesigning IOChannelFactory / FileSystem
> > > APIs
> > > > > (in
> > > > > > > > > >progress), which would extend Beam’s portability to many
> > > > > additional
> > > > > > > > > >file
> > > > > > > > > >systems and cloud providers.
> > > > > > > > > >
> > > > > > > > > >Congratulations to all three! Welcome!
> > > > > > > > > >
> > > > > > > > > >Davor
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Default Timestamp and Watermark

2017-01-26 Thread Thomas Groh
The default timestamp should be BoundedWindow.TIMESTAMP_MIN_VALUE, which is
equivalent to -2**63 microseconds. We also occasionally refer to this
timestamp as "negative infinity".

The default watermark policy for a bounded source should be negative
infinity until all of the data is read, then positive infinity. There isn't
really a default watermark policy for an unbounded source - this is
dependent on the data that hasn't been read from that source, so it's
dependent on where you're reading from.

Currently, modifying the timestamp of an element from within a DoFn does
not modify the watermark; modifying a timestamp forwards in time is
generally "safe", as it can't cause data to move to behind the watermark -
this is why moving elements backwards in time requires setting
"withAllowedTimestampSkew" (which also doesn't modify the watermark, which
means that elements that are moved backwards in time can become late and be
dropped by a runner). I don't think we currently have any changes in-flight
to make this configurable.

On Wed, Jan 25, 2017 at 9:24 PM, Shen Li  wrote:

> Hi,
>
> When reading from a source with no timestamp specified on elements, what
> should be the default timestamp? I presume that it should be 0 as I saw
> PAssertTest trying to set timestamps to very small values with 0 allowed
> timestamp skew. Is that right?
>
> What about the default watermark policy?
>
> If a ParDo modifies the timestamp using
> DoFnProcessContext.outputWithTimestamp, how should that affect the output
> watermark? Say the ParDo adds 100 seconds to the timestamp of each element
> in processElement, how could the runner know it should also add 100 seconds
> to output timestamps?
>
> Thanks,
>
> Shen
>


Re: Conceptually, what are bundles?

2017-01-25 Thread Thomas Groh
I have a couple of points in addition to what Robert said

Runners are permitted to determine bundle sizes as appropriate to their
implementation, so long as bundles are atomically committed. The contents
of a PCollection are independent of the bundling of that PCollection.

Runners can process all elements within their own bundles (e.g.
https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
3841af9b99/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/
translation/wrappers/streaming/DoFnOperator.java#L289), the entire input
data, or anywhere in between.

On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> Bundles are simply the unit of commitment (retry) in the Beam SDK.
> They're not really a model concept, but do leak from the
> implementation into the API as it's not feasible to checkpoint every
> individual process call, and this allows some state/compute/... to be
> safely amortized across elements (either the results of all processed
> elements in a bundle are sent downstream, or none are and the entire
> bundle is retried).
>
> On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak  wrote:
> > Hi,
> >
> > I’m a finalist CompSci student at the University of Cambridge, and for
> my final project/dissertation I am writing an implementation of the Beam
> SDK in Elixir [1]. Given that the Beam project is obviously still very much
> WIP, it’s still somewhat difficult to find good conceptual overviews of
> parts of the system, which is crucial when translating the OOP architecture
> to something completely different. However I have found many of the design
> docs scattered around the JIRA and here very helpful. (Incidentally,
> perhaps it would be helpful to maintain a list of them, to help any
> contributors acquaint themselves with the conceptual vision of the
> implementation?)
> >
> > One thing which I have not yet been able to work out is the significance
> of “bundles” in the SDK. On the one hand, it seems that they are simply an
> implementation detail, effectively a way to do micro-batch processing
> efficiently, and indeed they are not mentioned at all in the original
> Dataflow paper or anywhere in the Beam docs (except in passing). On the
> other hand, it seems most of the key transforms in the SDK core have a
> concept of bundles and operate in their terms in practice, while all
> conceptually being described as just operating on elements.
> >
> > Do bundles have semantic meaning in the Beam Model? Are there any
> guidelines as to how a given transform should split its output up into
> bundles? Should any runner/SDK implementing the Model have that concept,
> even when other primitives for streaming data processing including things
> like efficiently transmitting individual elements between stages with
> backpressure are available in the language/standard libraries? Are there
> any insights here that I am missing, i.e. were problems present in early
> versions of the runners solved by adding the concept of bundles?
> >
> > Thanks so much,
> > Matt
> >
> > [1] http://elixir-lang.org/
>