Re: [DISCUSS] ExecIO

2016-12-08 Thread Ben Chambers
I think I agree with Robert (unless I'm misunderstanding his point).

I think that the shell commands are going to be the most useful if it is
possible to take the elements in an input PCollection, construct a shell
command depending on those elements, and then execute it. I think doing so
in a fully general manner outside of a DoFn will be difficult. If instead
we made it easier to declare a DoFn as having requirements on the
environment (these programs must be available in the shell) and easier to
execute shell commands within a DoFn, I think that covers many more use
cases.

On Thu, Dec 8, 2016 at 12:23 PM Robert Bradshaw 
wrote:

> On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré 
> wrote:
> > By the way, just to elaborate a bit why I provided as an IO:
> >
> > 1. From an user experience perspective, I think we have to provide
> > convenient way to write pipeline. Any syntax simplifying this is
> valuable.
> > I think it's easier to write:
> >
> > pipeline.apply(ExecIO.read().withCommand("foo"))
> >
> > than:
> >
> > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());
>
> Slightly. Still, when I see
>
> pipeline.apply(ExecIO.read().withCommand("foo"))
>
> I am surprised to get a PCollection with a single element...
>
> > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for
> > "connector": reading/writing from/to a data source. So, even without the
> IO
> > "wrapping" (by wrapping, I mean the Read and Write), I think Exec
> extension
> > should be in IO as it's a source/write of data.
>
> To clarify, if you wrote a DoFn that, say, did lookups against a MySQL
> database, you would consider this an IO? For me, IO denotes
> input/output, i.e. the roots and leaves of a pipeline.
>
> > Regards
> > JB
> >
> > On 12/07/2016 08:37 AM, Robert Bradshaw wrote:
> >>
> >> I don't mean to derail the tricky environment questions, but I'm not
> >> seeing why this is bundled as an IO rather than a plain DoFn (which
> >> can be applied to a PCollection of one or more commands, yielding
> >> their outputs). Especially for the case of a Read, which in this case
> >> is not splittable (initially or dynamically) and always produces a
> >> single element--feels much more like a Map to me.
> >>
> >> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
> >>  wrote:
> >>>
> >>> Ben - the issues of "things aren't hung, there is a shell command
> >>> running",
> >>> aren't they general to all DoFn's? i.e. I don't see why the runner
> would
> >>> need to know that a shell command is running, but not that, say, a
> heavy
> >>> monolithic computation is running. What's the benefit to the runner in
> >>> knowing that the DoFn contains a shell command?
> >>>
> >>> By saying "making sure that all shell commands finish", I suppose
> you're
> >>> referring to the possibility of leaks if the user initiates a shell
> >>> command
> >>> and forgets to wait for it? I think that should be solvable again
> without
> >>> Beam intervention, by making a utility class for running shell commands
> >>> which implements AutoCloseable, and document that you have to use it
> that
> >>> way.
> >>>
> >>> Ken - I think the question here is: are we ok with a situation where
> the
> >>> runner doesn't check or care whether the shell command can run, and the
> >>> user accepts this risk and studies what commands will be available on
> the
> >>> worker environment provided by the runner they use in production,
> before
> >>> productionizing a pipeline with those commands.
> >>>
> >>> Upon some thought I think it's ok. Of course, this carries an
> obligation
> >>> for runners to document their worker environment and its changes across
> >>> versions. Though for many runners such documentation may be trivial:
> >>> "whatever your YARN cluster has, the runner doesn't change it in any
> way"
> >>> and it may be good enough for users. And for other runners, like
> >>> Dataflow,
> >>> such documentation may also be trivial: "no guarantees whatsoever, only
> >>> what you stage in --filesToStage is available".
> >>>
> >>> I can also see Beam develop to a point where we'd want all runners to
> be
> >>> able to run your DoFn in a user-specified Docker container, and manage
> >>> those intelligently - but I think that's quite a while away and it
> >>> doesn't
> >>> have to block work on a utility for executing shell commands. Though
> it'd
> >>> be nice if the utility was forward-compatible with that future world.
> >>>
> >>> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré 
> >>> wrote:
> >>>
>  Hi Eugene,
> 
>  thanks for the extended questions.
> 
>  I think we have two levels of expectations here:
>  - end-user responsibility
>  - worker/runner responsibility
> 
>  1/ From a end-user perspective, the end-user has to know that using a
>  system command (via ExecIO) and more generally speaking anything which
>  relay on worker resources (for instance a local filesystem directory
>  availab

Re: [DISCUSS] [BEAM-438] Rename one of PTransform.apply or PInput.apply

2016-12-07 Thread Ben Chambers
+1 to pushing all remaining "major" (read likely to affect everyone) breaks
through in a single release.

On Wed, Dec 7, 2016 at 3:56 PM Dan Halperin 
wrote:

> +user@, because this is a user-impacting change and they might not all be
> paying attention to the dev@ list.
>
> +1
>
> I'm mildly reluctant because this will break all users that have written
> composite transforms -- and I'm the jerk that filed the issue (a few times
> now, on different iterations of the SDKs). But, like Ben said, I can't
> think of a different way to do this that doesn't break users.
>
> Hopefully, with breaking changes pushed to DoFn and to PTransform, the
> worst user churn would be over. I can't think of anything quite so
> invasive.
>
> IMO: if there is, we should try to push all the remaining "major" breaks
> through in the same release.
>
> Dan
>
> On Thu, Dec 8, 2016 at 7:48 AM, Aljoscha Krettek 
> wrote:
>
> > +1
> >
> > I've seen this mistake myself in some PRs.
> >
> > On Thu, 8 Dec 2016 at 06:10 Ben Chambers 
> > wrote:
> >
> > > +1 -- This seems like the best option. It's a mechanical change, and
> the
> > > compiler will let users know it needs to be made. It will make the
> > mistake
> > > much less common, and when it occurs it will be much clearer what is
> > wrong.
> > >
> > > It would be great if we could make the mis-use a compiler problem or a
> > > pipeline construction time error without changing the names, but both
> of
> > > these options are not practical. We can't hide the expansion method,
> > since
> > > it is what PTransform implementations need to override. We can't make
> > this
> > > a construction time exception since it would require adding code to
> every
> > > PTransform implementation.
> > >
> > > On Wed, Dec 7, 2016 at 1:55 PM Thomas Groh 
> > > wrote:
> > >
> > > > +1; This is probably the best way to make sure users don't reverse
> the
> > > > polarity of the PCollection flow.
> > > >
> > > > This also brings PInput.expand(), POutput.expand(), and
> > > > PTransform.expand(PInput) into line - namely, for some composite
> thing,
> > > > "represent yourself as some collection of primitives" (potentially
> > > > recursively).
> > > >
> > > > On Wed, Dec 7, 2016 at 1:37 PM, Kenneth Knowles
>  > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I want to bring up another major backwards-incompatible change
> before
> > > it
> > > > is
> > > > > too late, to resolve [BEAM-438].
> > > > >
> > > > > Summary: Leave PInput.apply the same but rename PTransform.apply to
> > > > > PTransform.expand. I have opened [PR #1538] just for reference (it
> > took
> > > > 30
> > > > > seconds using IDE automated refactor)
> > > > >
> > > > > This change affects *PTransform authors* but does *not* affect
> > pipeline
> > > > > authors.
> > > > >
> > > > > This issue was filed a long time ago. It has been a problem many
> > times
> > > > with
> > > > > actual users since before Beam started incubating. This is what
> goes
> > > > wrong
> > > > > (often):
> > > > >
> > > > >PCollection input = ...
> > > > >PTransform, ...> transform = ...
> > > > >
> > > > >transform.apply(input)
> > > > >
> > > > > This type checks and even looks perfectly normal. Do you see the
> > error?
> > > > >
> > > > > ... what we need the user to write is:
> > > > >
> > > > > input.apply(transform)
> > > > >
> > > > > What a confusing difference! After all, the first one type-checks
> and
> > > the
> > > > > first one is how you apply a Function or Predicate or
> > > > SerializableFunction,
> > > > > etc. But it is broken. With transform.apply(input) the transform is
> > not
> > > > > registered with the pipeline at all.
> > > > >
> > > > > We obviously can't (and don't want to) change the most core way
> that
> > > > > pipeline authors use Beam, so PInput.apply (aka PCollecti

Re: [DISCUSS] [BEAM-438] Rename one of PTransform.apply or PInput.apply

2016-12-07 Thread Ben Chambers
+1 -- This seems like the best option. It's a mechanical change, and the
compiler will let users know it needs to be made. It will make the mistake
much less common, and when it occurs it will be much clearer what is wrong.

It would be great if we could make the mis-use a compiler problem or a
pipeline construction time error without changing the names, but both of
these options are not practical. We can't hide the expansion method, since
it is what PTransform implementations need to override. We can't make this
a construction time exception since it would require adding code to every
PTransform implementation.

On Wed, Dec 7, 2016 at 1:55 PM Thomas Groh  wrote:

> +1; This is probably the best way to make sure users don't reverse the
> polarity of the PCollection flow.
>
> This also brings PInput.expand(), POutput.expand(), and
> PTransform.expand(PInput) into line - namely, for some composite thing,
> "represent yourself as some collection of primitives" (potentially
> recursively).
>
> On Wed, Dec 7, 2016 at 1:37 PM, Kenneth Knowles 
> wrote:
>
> > Hi all,
> >
> > I want to bring up another major backwards-incompatible change before it
> is
> > too late, to resolve [BEAM-438].
> >
> > Summary: Leave PInput.apply the same but rename PTransform.apply to
> > PTransform.expand. I have opened [PR #1538] just for reference (it took
> 30
> > seconds using IDE automated refactor)
> >
> > This change affects *PTransform authors* but does *not* affect pipeline
> > authors.
> >
> > This issue was filed a long time ago. It has been a problem many times
> with
> > actual users since before Beam started incubating. This is what goes
> wrong
> > (often):
> >
> >PCollection input = ...
> >PTransform, ...> transform = ...
> >
> >transform.apply(input)
> >
> > This type checks and even looks perfectly normal. Do you see the error?
> >
> > ... what we need the user to write is:
> >
> > input.apply(transform)
> >
> > What a confusing difference! After all, the first one type-checks and the
> > first one is how you apply a Function or Predicate or
> SerializableFunction,
> > etc. But it is broken. With transform.apply(input) the transform is not
> > registered with the pipeline at all.
> >
> > We obviously can't (and don't want to) change the most core way that
> > pipeline authors use Beam, so PInput.apply (aka PCollection.apply) must
> > remain the same. But we do need a way to make it impossible to mix these
> > up.
> >
> > The simplest way I can think of is to choose a new name for the other
> > method involved. Users probably won't write transform.expand(input) since
> > they will never have seen it in any examples, etc. This will just make
> > PTransform authors need to do a global rename, and the type system will
> > direct them to all cases so there is no silent failure possible.
> >
> > What do you think?
> >
> > Kenn
> >
> > [BEAM-438] https://issues.apache.org/jira/browse/BEAM-438
> > [PR #1538] https://github.com/apache/incubator-beam/pull/1538
> >
> > p.s. there is a really amusing and confusing call chain:
> PCollection.apply
> > -> Pipeline.applyTransform -> Pipeline.applyInternal ->
> > PipelineRunner.apply -> PTransform.apply
> >
> > After this change and work to get the runner out of the loop, it becomes
> > PCollection.apply -> Pipeline.applyTransform -> PTransform.expand
> >
>


Re: [DISCUSS] ExecIO

2016-12-05 Thread Ben Chambers
The problem with not integrating with Beam at all, is the runner doesn't
know about any of these callouts. So it can't report "things aren't hung,
there is a shell command running", etc.

But, the integration doesn't need to be particularly deep. Imagine that the
you can just pass the ProcessBuilder to the "ShellExecutor". It could also
deal with managing thread pools and adjusting parallelism, making sure all
the shell commands finish (even if they are async) before ending the
process element, etc.

On Mon, Dec 5, 2016 at 1:39 PM Eugene Kirpichov
 wrote:

> @Kenn - Would you suggest that all runners need to support running code in
> a user-specified container?
> @Ben - Hmm, the features you're suggesting don't seem like they require
> deep integration into Beam itself, but can be accomplished by separate
> utility functions (or perhaps regular language-specific facilities like
> java's ProcessBuilder).
>
> On Mon, Dec 5, 2016 at 1:21 PM Ben Chambers  wrote:
>
> One option would be to use the reflective DoFn approach to this. Imagine
> something like:
>
> public class MyExternalFn extends DoFn {
>   @ProcessElement
>   // Existence of ShellExecutor indicates the code shells out.
>   public void processElement(ProcessContext c, ShellExecutor shell) {
> ...
> Future result = shell.executeAsync("...");
> ...
> c.output(result.get());
>   }
> }
>
> The API for the shell can include non-future methods, but this allows the
> runners to know what commands interact with the shell, but also to report
> things like (1) shell process fails (2) shell process hangs forever, better
> indicate that upwards and (3) it allows the runner to manage parallelism
> interacting with the shell.
>
> Requirements for the executor can be specified with an annotation on the
> parameter or via an annotation within the DoFn.
>
> On Mon, Dec 5, 2016 at 1:15 PM Kenneth Knowles 
> wrote:
>
> > I would like the runner-independent, language-independent graph to have a
> > way to specify requirements on the environment that a DoFn runs in. This
> > would provide a natural way to talk about installed libraries,
> containers,
> > external services that are accessed, etc, and I think the requirement of
> a
> > particular OS with tools installed fits right in. At the crudest level,
> > this could be limited to a container URL.
> >
> > Then the Java SDK needs a way to express these requirements. They will
> > generally probably be properties of a DoFn instance rather than a DoFn
> > class, since they may vary with instantiation parameters.
> >
> > On Mon, Dec 5, 2016 at 11:51 AM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > > Hi JB,
> > >
> > > Thanks for bringing this to the mailing list. I also think that this is
> > > useful in general (and that use cases for Beam are more than just
> classic
> > > bigdata), and that there are interesting questions here at different
> > levels
> > > about how to do it right.
> > >
> > > I suggest to start with the highest-level question [and discuss the
> > > particular API only after agreeing on this, possibly in a separate
> > thread]:
> > > how to deal with the fact that Beam gives no guarantees about the
> > > environment on workers, e.g. which commands are available, which shell
> or
> > > even OS is being used, etc. Particularly:
> > >
> > > - Obviously different runners will have a different environment, e.g.
> > > Dataflow workers are not going to have Hadoop commands available
> because
> > > they are not running on a Hadoop cluster. So, pipelines and transforms
> > > developed using this connector will be necessarily non-portable between
> > > different runners. Maybe this is ok? But we need to give users a clear
> > > expectation about this. How do we phrase this expectation and where do
> we
> > > put it in the docs?
> > >
> > > - I'm concerned that this puts additional compatibility requirements on
> > > runners - it becomes necessary for a runner to document the environment
> > of
> > > its workers (OS, shell, privileges, guaranteed-installed packages,
> access
> > > to other things on the host machine e.g. whether or not the worker runs
> > in
> > > its own container, etc.) and to keep it stable - otherwise transforms
> and
> > > pipelines with this connector will be non-portable between runner
> > versions
> > > either.
> > >
> > > Another way to deal with this is to give u

Re: [DISCUSS] ExecIO

2016-12-05 Thread Ben Chambers
One option would be to use the reflective DoFn approach to this. Imagine
something like:

public class MyExternalFn extends DoFn {
  @ProcessElement
  // Existence of ShellExecutor indicates the code shells out.
  public void processElement(ProcessContext c, ShellExecutor shell) {
...
Future result = shell.executeAsync("...");
...
c.output(result.get());
  }
}

The API for the shell can include non-future methods, but this allows the
runners to know what commands interact with the shell, but also to report
things like (1) shell process fails (2) shell process hangs forever, better
indicate that upwards and (3) it allows the runner to manage parallelism
interacting with the shell.

Requirements for the executor can be specified with an annotation on the
parameter or via an annotation within the DoFn.

On Mon, Dec 5, 2016 at 1:15 PM Kenneth Knowles 
wrote:

> I would like the runner-independent, language-independent graph to have a
> way to specify requirements on the environment that a DoFn runs in. This
> would provide a natural way to talk about installed libraries, containers,
> external services that are accessed, etc, and I think the requirement of a
> particular OS with tools installed fits right in. At the crudest level,
> this could be limited to a container URL.
>
> Then the Java SDK needs a way to express these requirements. They will
> generally probably be properties of a DoFn instance rather than a DoFn
> class, since they may vary with instantiation parameters.
>
> On Mon, Dec 5, 2016 at 11:51 AM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Hi JB,
> >
> > Thanks for bringing this to the mailing list. I also think that this is
> > useful in general (and that use cases for Beam are more than just classic
> > bigdata), and that there are interesting questions here at different
> levels
> > about how to do it right.
> >
> > I suggest to start with the highest-level question [and discuss the
> > particular API only after agreeing on this, possibly in a separate
> thread]:
> > how to deal with the fact that Beam gives no guarantees about the
> > environment on workers, e.g. which commands are available, which shell or
> > even OS is being used, etc. Particularly:
> >
> > - Obviously different runners will have a different environment, e.g.
> > Dataflow workers are not going to have Hadoop commands available because
> > they are not running on a Hadoop cluster. So, pipelines and transforms
> > developed using this connector will be necessarily non-portable between
> > different runners. Maybe this is ok? But we need to give users a clear
> > expectation about this. How do we phrase this expectation and where do we
> > put it in the docs?
> >
> > - I'm concerned that this puts additional compatibility requirements on
> > runners - it becomes necessary for a runner to document the environment
> of
> > its workers (OS, shell, privileges, guaranteed-installed packages, access
> > to other things on the host machine e.g. whether or not the worker runs
> in
> > its own container, etc.) and to keep it stable - otherwise transforms and
> > pipelines with this connector will be non-portable between runner
> versions
> > either.
> >
> > Another way to deal with this is to give up and say "the environment on
> the
> > workers is outside the scope of Beam; consult your runner's documentation
> > or use your best judgment as to what the environment will be, and use
> this
> > at your own risk".
> >
> > What do others think?
> >
> > On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré 
> > wrote:
> >
> > Hi beamers,
> >
> > Today, Beam is mainly focused on data processing.
> > Since the beginning of the project, we are discussing about extending
> > the use cases coverage via DSLs and extensions (like for machine
> > learning), or via IO.
> >
> > Especially for the IO, we can see Beam use for data integration and data
> > ingestion.
> >
> > In this area, I'm proposing a first IO: ExecIO:
> >
> > https://issues.apache.org/jira/browse/BEAM-1059
> > https://github.com/apache/incubator-beam/pull/1451
> >
> > Actually, this IO is mainly an ExecFn that executes system commands
> > (again, keep in mind we are discussing about data integration/ingestion
> > and not data processing).
> >
> > For convenience, this ExecFn is wrapped in Read and Write (as a regular
> > IO).
> >
> > Clearly, this IO/Fn depends of the worker where it runs. But it's under
> > the user responsibility.
> >
> > During the review, Eugene and I discussed about:
> > - is it an IO or just a fn ?
> > - is it OK to have worker specific IO ?
> >
> > IMHO, an IO makes lot of sense to me and it's very convenient for end
> > users. They can do something like:
> >
> > PCollection output =
> > pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));
> >
> > The pipeline will execute myscript and the output pipeline will contain
> > command execution std out/err.
> >
> > On the other hand, they can do:
> >
> > pcolle

Re: [DISCUSS] Using Verbs for Transforms

2016-10-26 Thread Ben Chambers
I also like Distinct since it doesn't make it sound like it modifies any
underlying collection. RemoveDuplicates makes it sound like the duplicates
are removed, rather than a new PCollection without duplicates being
returned.

On Wed, Oct 26, 2016, 7:36 AM Jean-Baptiste Onofré  wrote:

> Agree. It was more a transition proposal.
>
> Regards
> JB
>
> ⁣​
>
> On Oct 26, 2016, 08:31, at 08:31, Robert Bradshaw
>  wrote:
> >On Mon, Oct 24, 2016 at 11:02 PM, Jean-Baptiste Onofré
> > wrote:
> >> And what about use RemoveDuplicates and create an alias Distinct ?
> >
> >I'd really like to avoid (long term) aliases--you end up having to
> >document (and maintain) them both, and it adds confusion as to which
> >one to use (especially if they every diverge), and means searching for
> >one or the other yields half the results.
> >
> >> It doesn't break the API and would address both SQL users and more
> >"big data" users.
> >>
> >> My $0.01 ;)
> >>
> >> Regards
> >> JB
> >>
> >> ⁣
> >>
> >> On Oct 24, 2016, 22:23, at 22:23, Dan Halperin
> > wrote:
> >>>I find "MakeDistinct" more confusing. My votes in decreasing
> >>>preference:
> >>>
> >>>1. Keep `RemoveDuplicates` name, ensure that important keywords are
> >in
> >>>the
> >>>Javadoc. This reduces churn on our users and is honestly pretty dang
> >>> descriptive.
> >>>2. Rename to `Distinct`, which is clear if you're a SQL user and
> >likely
> >>>less clear otherwise. This is a backwards-incompatible API change, so
> >>>we
> >>>should do it before we go stable.
> >>>
> >>>I am not super strong that 1 > 2, but I am very strong that
> >"Distinct"
> >>
> >>>"MakeDistinct" or and "RemoveDuplicates" >>> "AvoidDuplicate".
> >>>
> >>>Dan
> >>>
> >>>On Mon, Oct 24, 2016 at 10:12 AM, Kenneth Knowles
> >>>
> >>>wrote:
> >>>
>  The precedent that we use verbs has many exceptions. We have
>  ApproximateQuantiles, Values, Keys, WithTimestamps, and I would
> >even
>  include Sum (at least when I read it).
> 
>  Historical note: the predilection towards verbs is from the Google
> >>>Style
>  Guide for Java method names
> 
> >>> >,
>  which states "Method names are typically verbs or verb phrases".
> >But
> >>>even
>  in Google code there are lots of exceptions when it makes sense,
> >like
>  Guava's
>  Iterables.any(), Iterables.all(), Iterables.toArray(), the entire
>  Predicates module, etc. Just an aside; Beam isn't Google code. I
> >>>suggest we
>  use our judgment rather than a policy.
> 
>  I think "Distinct" is one of those exceptions. It is a standard
> >>>widespread
>  name and also reads better as an adjective. I prefer it, but also
> >>>don't
>  care strongly enough to change it or to change it back :-)
> 
>  If we must have a verb, I like it as-is more than MakeDistinct and
>  AvoidDuplicate.
> 
>  On Mon, Oct 24, 2016 at 9:46 AM Jesse Anderson
> >>>
>  wrote:
> 
>  > My original thought for this change was that Crunch uses the
> >class
> >>>name
>  > Distinct. SQL also uses the keyword distinct.
>  >
>  > Maybe the rule should be changed to adjectives or verbs depending
> >>>on the
>  > context.
>  >
>  > Using a verb to describe this class really doesn't connote what
> >the
> >>>class
>  > does as succinctly as the adjective.
>  >
>  > On Mon, Oct 24, 2016 at 9:40 AM Neelesh Salian
> >>>
>  > wrote:
>  >
>  > > Hello,
>  > >
>  > > First of all, thank you to Daniel, Robert and Jesse for their
> >>>review on
>  > > this: https://issues.apache.org/jira/browse/BEAM-239
>  > >
>  > > A point that came up was using verbs explicitly for Transforms.
>  > > Here is the PR:
> >>>https://github.com/apache/incubator-beam/pull/1164
>  > >
>  > > Posting it to help understand if we have a consensus for it and
> >>>if yes,
>  > we
>  > > could perhaps document it for future changes.
>  > >
>  > > Thank you.
>  > >
>  > > --
>  > > Neelesh Srinivas Salian
>  > > Engineer
>  > >
>  >
> 
>


Re: [ANNOUNCEMENT] New committers!

2016-10-21 Thread Ben Chambers
Congrats. +3!

On Fri, Oct 21, 2016 at 3:34 PM Kenneth Knowles 
wrote:

> Huzzah!
>
> I've personally enjoyed working together, and I am glad to extend this
> acknowledgement and welcome this addition to the Beam community.
>
> Kenn
>
> On Fri, Oct 21, 2016 at 3:18 PM Davor Bonaci  wrote:
>
> > Hi everyone,
> > Please join me and the rest of Beam PPMC in welcoming the following
> > contributors as our newest committers. They have significantly
> contributed
> > to the project in different ways, and we look forward to many more
> > contributions in the future.
> >
> > * Thomas Weise
> > Thomas authored the Apache Apex runner for Beam [1]. This is an exciting
> > new runner that opens a new user base. It is a large contribution, which
> > starts the whole new component with a great potential.
> >
> > * Jesse Anderson
> > Jesse has contributed significantly by promoting Beam. He has
> co-developed
> > a Beam tutorial and delivered it at a top big data conference. He
> published
> > several blog posts positioning Beam, Q&A with the Apache Beam team, and a
> > demo video how to run Beam on multiple runners [2]. On the side, he has
> > authored 7 pull requests and reported 6 JIRA issues.
> >
> > * Thomas Groh
> > Since starting incubation, Thomas has contributed the most commits to the
> > project [3], a total of 226 commits, which is more than anybody else. He
> > has contributed broadly to the project, most significantly by developing
> > from scratch the DirectRunner that supports the full model semantics.
> > Additionally, he has contributed a new set of APIs for testing unbounded
> > pipelines. He published a blog highlighting this work.
> >
> > Congratulations to all three! Welcome!
> >
> > Davor
> >
> > [1] https://github.com/apache/incubator-beam/tree/apex-runner
> > [2] http://www.smokinghand.com/
> > [3] https://github.com/apache/incubator-beam/graphs/contributors
> > ?from=2016-02-01&to=2016-10-14&type=c
> >
>


Re: Simplifying User-Defined Metrics in Beam

2016-10-19 Thread Ben Chambers
On Thu, Oct 13, 2016 at 2:27 AM Aljoscha Krettek 
wrote:

I finally found the time to have a look. :-)



The API looks very good! (It's very similar to an API we recently added to

Flink, which is inspired by the same Codahale/Dropwizard metrics).



About the semantics, the "A", "B" and "C" you mention in the doc: doesn't

this mean that we have to keep the metrics in some fault-tolerant way?

Almost in something like the StateInternals, because they should survive

failures and contain the metrics over the successful runs. (Side note: in

Flink the metrics are just "since the last restart from failure" in case of

failures.)


Aggregators previously required behavior like C -- because they were an
output of the step they would only count the values over successful
invocations of each step. The new API clarifies that there are different
ways of implementing this (when the result of a step is committed we can
commit the counter values from that step, which gives us behavior like B or
C) or we can periodically report the current values (so that we include
values from failed bundles as well) which gives us behavior like A.

Clarifying that there are the different kinds of values is useful
(sometimes you want to compare A vs. C to see how much was "wasted" on
retries) and allows the runner to provide the values it can, and let the
user deal with the fact that the others aren't available.

But yes, to implement all 3 (A, B and C) some interaction with the runner
is necessary.


About querying the metrics, what we have mostly seen is that people want to

integrate metrics into a Metrics system that they already have in place.

They use Graphite, StatsD or simply JMX for this. In Flink we provide an

API for reporters that users can plug in to export the metrics to their

system of choice. I'm sure some people will like the option of having the

metrics queryable on the PipelineResult but I would assume that for most

production use cases integration with a metrics system is more important.


This is an interesting point. Querying was included because (1) we didn't
want to regress the functionality available from Aggregators too much and
(2) it is useful for tests (and logic) to be able to run a pipeline and
check on the counts.


Regarding removal of Aggregators I'm for B, to quote Saint Exupéry:

  "It seems that perfection is attained not when there is nothing more to

add, but when there is nothing more to remove."





> +1 to the new metrics design. I strongly favor B as well.

>

> On Wed, Oct 12, 2016 at 10:54 AM, Kenneth Knowles

>  wrote:

> > Correction: In my eagerness to see the end of aggregators, I mistook the

> > intention. Both A and B leave aggregators in place until there is a

> > replacement. In which case, I am strongly in favor of B. As soon as we

> can

> > remove aggregators, I think we should.

> >

> > On Wed, Oct 12, 2016 at 10:48 AM Kenneth Knowles  wrote:

> >

> >> Huzzah! This is IMO a really great change. I agree that we can get

> >> something in to allow work to continue, and improve the API as we
learn.

> >>

> >> On Wed, Oct 12, 2016 at 10:20 AM Ben Chambers

> 

> >> wrote:

> >>

> >> 3. One open question is what to do with Aggregators. In the doc I

> mentioned

> >>

> >> that long term I'd like to consider whether we can improve Aggregators

> to

> >> be a better fit for the model by supporting windowing and allowing them

> to

> >> serve as input for future steps. In the interim it's not clear what we

> >> should do with them. The two obvious (and extreme) options seem to be:

> >>

> >>

> >>

> >>   Option A: Do nothing, leave aggregators as they are until we revisit.

> >>

> >>

> >>   Option B: Remove aggregators from the SDK until we revisit.

> >>

> >> I'd like to suggest removing Aggregators once the existing runners have

> >> reasonable support for Metrics. Doing so reduces the surface area we

> need

> >> to maintain/support and simplifies other changes being made. It will

> also

> >> allow us to revisit them from a clean slate.

> >>

> >>

> >> +1 to removing aggregators, either of A or B. The new metrics design

> >> addresses aggregator use cases as well or better.

> >>

> >> So A vs B is a choice of whether we have a gap with no aggregator or

> >> metrics-like functionality. I think that is perhaps a bit of a bummer

> for

> >> users, and we will likely port over the runner code for it, so we

> wouldn't

> >> want to actually delete it, right? Can we do it in

Re: Specifying type arguments for generic PTransform builders

2016-10-13 Thread Ben Chambers
This is also a good reason to avoid overly general names like "from",
"create" and "of". Instead, the option should be ".fromQuery(String
query)", so we can add ".fromTable(...)".

On Thu, Oct 13, 2016 at 4:55 PM Dan Halperin 
wrote:

> For #3 -- I think we should be VERY careful there. You need to be
> absolutely certain that there will never, ever be another alternative to
> your mandatory argument. For example, you build an option to read from a
> DB, so you supply a .from(String query). Then later, you want to add
> reading just a table directly, so you add fromTable(Table). In this case,
> it's much better to use .read().fromQuery() or .read().fromTable() --
> having ".read()" be the "standard builder a'la #1".
>
> The other major drawback of #3 is the inability to provide generic
> configuration. For example, a utility function that gives you a database
> reader ready-to-go with all the default credentials and options you need --
> but independent of the type you want the result to end up as. You can't do
> that if you must bind the type first.
>
> I think that in general all of these patterns are significantly worse in
> the long run than the existing standards, e.g., for BigtableIO. These
> suggestions are motivated by making things easier on transform writers, but
> IMO we need to be optimizing for transform users.
>
> On Fri, Oct 7, 2016 at 4:48 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > In my original email, all FooBuilder's should be simply Foo. Sorry for
> the
> > confusion.
> >
> > On Thu, Oct 6, 2016 at 3:08 PM Kenneth Knowles 
> > wrote:
> >
> > > Mostly my thoughts are the same as Robert's. Use #3 whenever possible,
> > > fallback to #1 otherwise, but please consider using informative names
> for
> > > your methods in all cases.
> > >
> > > #1 GBK.create(): IMO this pattern is best only for transforms where
> > > withBar is optional or there is no such method, as in GBK. If it is
> > > mandatory, it should just be required in the first method, eliding the
> > > issue, as in ParDo.of(DoFn), MapElements.via(...), etc, like you
> > say
> > > in your concluding remark.
> > >
> > > #2 FooBuilder FooBuilder.create(): this too - if you are going to
> fix
> > > the type, fix it first. If it is optional and Foo is usable as a
> > > transform, then sure. (it would have be something weird like
> Foo > > OutputT, ?> extends PTransform)
> > >
> > > #3 Foo.create(Bar): this is best. Do this whenever possible. From my
> > > perspective, instead of "move the param to create(...)" I would
> describe
> > > this as "delete create() then rename withBar to create". Just skip the
> > > second step and you are in an even better design, withBar being the
> > > starting point. Just like ParDo.of and MapElements.via.
> > >
> > > #4 Dislike this, too, for the same reasons as #2 plus code bloat plus
> > user
> > > confusion.
> > >
> > > Side note since you use this method in all your examples: This kind of
> > use
> > > of "create" is a bad method name. There may be no new object "created".
> > > Sometimes we have no better idea, but create() is a poor default. For
> GBK
> > > both are bad: create() (we really only need one instance so why create
> > > anything?) and create() (what is the unlabeled boolean?). They
> > > would be improved by GBK.standard() and GBK.fewKeys() or some such. I
> > tend
> > > to think that focusing on this fine polish eliminates a lot of cases
> for
> > > the generalized question.
> > >
> > > Kenn
> > >
> > > On Thu, Oct 6, 2016 at 2:10 PM Eugene Kirpichov
> > >  wrote:
> > >
> > > > Quite a few transforms in the SDK are generic (i.e. have type
> > > parameters),
> > > > e.g. ParDo, GroupByKey, Keys / WithKeys, many connectors (TextIO,
> > > KafkaIO,
> > > > JdbcIO, MongoDbGridFSIO etc - both read and write). They use
> different
> > > > styles of binding the type parameters to concrete types in caller
> code.
> > > >
> > > > I would like us to make a decision which of those styles to recommend
> > for
> > > > new transform and connectors writers. This question is coming up
> rather
> > > > frequently, e.g. it came up in JdbcIO and MongoDbGridFSIO.
> > > >
> > > > For the purpose of this discussion, imagine a hypothetical builder
> > class
> > > > that looks like this:
> > > >
> > > > class Foo {
> > > > private Bar bar;
> > > > private int blah;
> > > >
> > > > Foo withBlah(int blah);
> > > > }
> > > >
> > > > So far I've seen several styles of binding the type argument in a
> > > withBar()
> > > > method vs. a creation method:
> > > >
> > > > 1. Binding at the creation method: e.g.:
> > > >
> > > > class Foo {
> > > > ...
> > > > public static  Foo create();
> > > > public FooBuilder withBar(Bar bar);
> > > > }
> > > >
> > > > Foo foo = Foo.create().withBlah(42).withBar(new
> > > > StringBar());
> > > >
> > > > Example: GroupByKey does this. As well as other transforms that don't
> > > have
> > > > a withBar()-like method, but still need a type argument, e.g. Key

Re: Simplifying User-Defined Metrics in Beam

2016-10-12 Thread Ben Chambers
Some general updates:

1. There has been some discussion in the doc and things have quieted down.
No major changes made beyond deciding to stick with function names on
Counters derived from Dropwizard and an open comment on whether to use
Histogram or Distribution as the name for one of the metrics.

2. The initial PR adding metrics to the Java Direct Runner has gone through
multiple rounds of refinement on the implementation details. It seems to be
quieting down. Once it receives LGTM from a committer I'll plan on merging
it unless I've heard any objections. We can (and likely will) continue to
revise the API after it is merged so I don't think we need to hold one for
the other. Once merged, I'll start looking at hooking it up in other
runners.

3. One open question is what to do with Aggregators. In the doc I mentioned
that long term I'd like to consider whether we can improve Aggregators to
be a better fit for the model by supporting windowing and allowing them to
serve as input for future steps. In the interim it's not clear what we
should do with them. The two obvious (and extreme) options seem to be:
  Option A: Do nothing, leave aggregators as they are until we revisit.
  Option B: Remove aggregators from the SDK until we revisit.
I'd like to suggest removing Aggregators once the existing runners have
reasonable support for Metrics. Doing so reduces the surface area we need
to maintain/support and simplifies other changes being made. It will also
allow us to revisit them from a clean slate.

Thoughts?

On Thu, Oct 6, 2016 at 5:41 AM Aljoscha Krettek  wrote:

Hi,

I'm currently in holidays but I'll put some thought into this and give my

comments once I get back.



Aljoscha



On Wed, Oct 5, 2016, 21:51 Ben Chambers 

wrote:



> To provide some more background I threw together a quick doc outlining my

> current thinking for this Metrics API. You can find it at

> http://s.apache.org/beam-metrics-api.

>

> The first PR (https://github.com/apache/incubator-beam/pull/1024)

> introducing these APIs for the direct runner is hopefully nearing

> completion. If there are no objections, I'd like to check it in and start

> working on hooking this up to other runners to flesh out how this will

> interact with them. We can continue to iterate on the API and concepts in

> the doc and create follow-up PRs for any changes we'd like to make.

>

> As always, let me know if there are any questions or comments!

>

> -- Ben

>

> On Wed, Sep 28, 2016 at 5:05 PM Ben Chambers  wrote:

>

> I started looking at BEAM-147: “Rename Aggregator to [P]Metric”. Rather

> than renaming the existing concept I’d like to introduce Metrics as a

> simpler mechanism to provide information during pipeline execution (I have

> updated the issue accordingly).

>

> Here is what I'm thinking would lead to a simpler API focused on reporting

> metrics about pipeline execution:

>

>1.

>

>Rather than support arbitrary Combine functions, Metrics support a set

>of specific aggregations with documented use-cases (eg., Counter,
Meter,

>Distribution, etc.) and an API inspired by the Dropwizard Metrics

> library.

>2.

>

>Rather than requiring declaration during pipeline construction (like

>Aggregators) Metrics allow declaration at any point because it is

> easier to

>use.

>3.

>

>Metrics provide more documented flexibility in how runners support
them,

>by allowing each runner to provide different details about metrics and

>support different kinds of metrics, while clearly documenting what the

>kinds are and what should happen if they aren’t supported. This allows

>users to use metrics in a reliable way even though runners may
implement

>them differently

>

>

> # What does the Metrics API look like?

>

> The API for using metrics would be relatively simple:

>

> // Metrics can be used as fields:

>

> private final Counter cnt = Metrics.counter(“mycode”, “odd-elements”);

>

> @ProcessElement

>

> public void processElement(ProcessContext c) {

>

>  if (c.element() % 2 == 1) {

>

>cnt.inc();

>

>  }

>

>  // Metrics can be created dynamically:

>

>  Metrics.distribution(“mycode”, “elements”).report(c.element());

>

>  ...

>

> }

>

> # What Kinds of Metrics could there be?

>

> There are many kinds of metrics that seem like they could be useful. We

> could eventually support metrics like the following:

>

>-

>

>Counter: Can be incremented/decremented. Will be part of the initial

>implementation.

>-

>

>Distribution: Values can be reported and various statistics are

>reported. The initial implem

Re: Introducing a Redistribute transform

2016-10-11 Thread Ben Chambers
As Kenn points out, I think the nature of the Redistribute operation is to
act as a hint (or requirement) to the runner that a certain distribution
the elements is desirable. In a perfect this wouldn't be necessary because
every runner would be able to do exactly the right thing. Looking at the
different use cases may be helpful:

1. Redistribute.arbitrarily being used in IO as a fusion break and
checkpoint. We could express this as a hint saying that we'd like to
persist the PCollection at this point.
2. Redistribute.perKey being used to checkpoint keys in a keyed
PCollection. I think this could be the same as the previous hint or a
variant thereof.
3. Redistribute.perKey to ensure that the elements are distributed across
machines such that all elements with a specific key are on the same
machine. This should only be necessary for per-key processing (such as
state) and can be added by the runner when necessary (becomes easier once
we have a notion of transforms that preserve key-partitioning, etc.)

Of these 1 and 2 seem to be the most interesting. The hint can be
implemented in various ways -- a transform that represents the hint (and
the runner can then implement as it sees fit) or via a method that sets
some property on the PCollection, to which the runner could choose to apply
a transform. I lean towards the former (keeping this as a transform) since
it fits more naturally into the codebase and doesn't require extending
PCollection (something we avoid).

What if this was something like: ".apply(Hints.checkpoint())" or
".apply(Hints.break())"? This makes it clearer that this is a hint to the
runner and not part of the semantics?

On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles 
wrote:

> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
>
>  wrote:
>
>
>
> > The transform, the way it's implemented, actually does several things at
>
> > the same time and that's why it's tricky to document it.
>
> >
>
>
>
> This thread has actually made me less sure about my thoughts on this
>
> transform. I do know what the transform is about and I do think we need it.
>
> But I don't know that it can be explained "within the model". Look at our
>
> classic questions about Redistribute.arbitrarily() and
> Redistribute.byKey():
>
>
>
>  - "what" is it computing? The identity on its input.
>
>  - "where" is the event time windowing? Same as its input.
>
>  - "when" is output produced? As fast as reasonable (runner-specific).
>
>  - "how" are refinements related? Same as its input (I think this might
>
> actually be incorrect if accumulating fired panes)
>
>
>
> These points don't describe any of the real goals of Redistribute. Hence
>
> describing it in terms of fusion and checkpointing, which are quite
>
> runner-specific in their (optional) manifestations.
>
>
>
> - Introduces a fusion barrier (in runners that have it), making sure that
>
> > the runner can fully parallelize processing the output PCollection with
>
> > DoFn's
>
> >
>
>
>
> Can a runner introduce other fusion barriers whenever it wants? Yes.
>
> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
>
> why not?)
>
>
>
>
>
> > - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>
> > input PCollection (again, in runners where it makes sense) and making
> sure
>
> > that processing elements of the output PCollection with a DoFn, if the
> DoFn
>
> > fails, will redo only that processing, but not need to recompute the
> input
>
> > PCollection.
>
> >
>
>
>
> Can a runner introduce a checkpoint whenever appropriate? Yes.
>
> Can a runner ignore a hint to checkpoint? Yes (if it can still compute the
>
> same result - it may not even conceive of checkpointing in a compatible
>
> way).
>
>
>
> - All of the above and also makes the collection "key-partitioned", giving
>
> > access to per-key state to downstream key-preserving DoFns. However, this
>
> > is also runner-specific, because it's conceivable that a runner might not
>
> > need this "key-partitioned" property (in fact it's best if a runner
>
> > inserted such a "redistribute by key" automatically if it needs it...),
> and
>
> > it currently isn't exposed anyway.
>
> >
>
>
>
> Agreed. The runner should insert the necessary keying wherever needed. One
>
> might say the same for other uses of Redistribute, but in practice hints
>
> are useful.
>
>
>
>
>
> > Still thinking about the best way to describe this in a way that's least
>
> > confusing to users.
>
> >
>
>
>
> I think it isn't just about users. I don't the transform is quite
>
> well-defined at the "what the runner must do" level. Here is a question I
>
> am considering: When is it _incorrect_ for a runner to replace a
>
> Redistribute with an identity transform? I have some thoughts, such as
>
> committing pseudorandomly generated data, but do you have some other ideas?
>
>
>
> Kenn
>
>


Re: Simplifying User-Defined Metrics in Beam

2016-10-05 Thread Ben Chambers
To provide some more background I threw together a quick doc outlining my
current thinking for this Metrics API. You can find it at
http://s.apache.org/beam-metrics-api.

The first PR (https://github.com/apache/incubator-beam/pull/1024)
introducing these APIs for the direct runner is hopefully nearing
completion. If there are no objections, I'd like to check it in and start
working on hooking this up to other runners to flesh out how this will
interact with them. We can continue to iterate on the API and concepts in
the doc and create follow-up PRs for any changes we'd like to make.

As always, let me know if there are any questions or comments!

-- Ben

On Wed, Sep 28, 2016 at 5:05 PM Ben Chambers  wrote:

I started looking at BEAM-147: “Rename Aggregator to [P]Metric”. Rather
than renaming the existing concept I’d like to introduce Metrics as a
simpler mechanism to provide information during pipeline execution (I have
updated the issue accordingly).

Here is what I'm thinking would lead to a simpler API focused on reporting
metrics about pipeline execution:

   1.

   Rather than support arbitrary Combine functions, Metrics support a set
   of specific aggregations with documented use-cases (eg., Counter, Meter,
   Distribution, etc.) and an API inspired by the Dropwizard Metrics library.
   2.

   Rather than requiring declaration during pipeline construction (like
   Aggregators) Metrics allow declaration at any point because it is easier to
   use.
   3.

   Metrics provide more documented flexibility in how runners support them,
   by allowing each runner to provide different details about metrics and
   support different kinds of metrics, while clearly documenting what the
   kinds are and what should happen if they aren’t supported. This allows
   users to use metrics in a reliable way even though runners may implement
   them differently


# What does the Metrics API look like?

The API for using metrics would be relatively simple:

// Metrics can be used as fields:

private final Counter cnt = Metrics.counter(“mycode”, “odd-elements”);

@ProcessElement

public void processElement(ProcessContext c) {

 if (c.element() % 2 == 1) {

   cnt.inc();

 }

 // Metrics can be created dynamically:

 Metrics.distribution(“mycode”, “elements”).report(c.element());

 ...

}

# What Kinds of Metrics could there be?

There are many kinds of metrics that seem like they could be useful. We
could eventually support metrics like the following:

   -

   Counter: Can be incremented/decremented. Will be part of the initial
   implementation.
   -

   Distribution: Values can be reported and various statistics are
   reported. The initial implementation will support “easy” statistics like
   MIN/MAX/MEAN/SUM/COUNT. We’d like to support quantiles in the future to
   make this more comparable to Dropwizard’s Histogram.
   -

   (Future) Meter: Method to indicate something happened. Computes the rate
   of occurrences.
   -

   (Future) Timer: A meter measuring how often something happens plus a
   distribution of how long it took each time.
   -

   (Future) Frequent Elements: Reports values that occurred more than N% of
   the time.


# What are the next steps?

I’ve started work prototyping the new API by implementing it for the Java
DirectRunner. To see an example pipeline that reports a Counter and a
Distribution, take a look at the first PR
https://github.com/apache/incubator-beam/pull/1024

# Where does that leave Aggregators?
Hopefully, this new Metrics API addresses the goals of monitoring a
pipeline more cleanly than Aggregators. In the long term, it would be good
to make Aggregators a more complete participant in the model, by adding
support for windowing and allowing the results to be used as input to later
steps in the pipeline. Or to make them completely unnecessary by making it
easy to use side-outputs with the new reflective DoFn approach. Once
Metrics are available, we may want to deprecate or remove Aggregators until
we’re ready to figure out what the right API is.


Simplifying User-Defined Metrics in Beam

2016-09-28 Thread Ben Chambers
I started looking at BEAM-147: “Rename Aggregator to [P]Metric”. Rather
than renaming the existing concept I’d like to introduce Metrics as a
simpler mechanism to provide information during pipeline execution (I have
updated the issue accordingly).

Here is what I'm thinking would lead to a simpler API focused on reporting
metrics about pipeline execution:

   1.

   Rather than support arbitrary Combine functions, Metrics support a set
   of specific aggregations with documented use-cases (eg., Counter, Meter,
   Distribution, etc.) and an API inspired by the Dropwizard Metrics library.
   2.

   Rather than requiring declaration during pipeline construction (like
   Aggregators) Metrics allow declaration at any point because it is easier to
   use.
   3.

   Metrics provide more documented flexibility in how runners support them,
   by allowing each runner to provide different details about metrics and
   support different kinds of metrics, while clearly documenting what the
   kinds are and what should happen if they aren’t supported. This allows
   users to use metrics in a reliable way even though runners may implement
   them differently


# What does the Metrics API look like?

The API for using metrics would be relatively simple:

// Metrics can be used as fields:

private final Counter cnt = Metrics.counter(“mycode”, “odd-elements”);

@ProcessElement

public void processElement(ProcessContext c) {

 if (c.element() % 2 == 1) {

   cnt.inc();

 }

 // Metrics can be created dynamically:

 Metrics.distribution(“mycode”, “elements”).report(c.element());

 ...

}

# What Kinds of Metrics could there be?

There are many kinds of metrics that seem like they could be useful. We
could eventually support metrics like the following:

   -

   Counter: Can be incremented/decremented. Will be part of the initial
   implementation.
   -

   Distribution: Values can be reported and various statistics are
   reported. The initial implementation will support “easy” statistics like
   MIN/MAX/MEAN/SUM/COUNT. We’d like to support quantiles in the future to
   make this more comparable to Dropwizard’s Histogram.
   -

   (Future) Meter: Method to indicate something happened. Computes the rate
   of occurrences.
   -

   (Future) Timer: A meter measuring how often something happens plus a
   distribution of how long it took each time.
   -

   (Future) Frequent Elements: Reports values that occurred more than N% of
   the time.


# What are the next steps?

I’ve started work prototyping the new API by implementing it for the Java
DirectRunner. To see an example pipeline that reports a Counter and a
Distribution, take a look at the first PR
https://github.com/apache/incubator-beam/pull/1024

# Where does that leave Aggregators?
Hopefully, this new Metrics API addresses the goals of monitoring a
pipeline more cleanly than Aggregators. In the long term, it would be good
to make Aggregators a more complete participant in the model, by adding
support for windowing and allowing the results to be used as input to later
steps in the pipeline. Or to make them completely unnecessary by making it
easy to use side-outputs with the new reflective DoFn approach. Once
Metrics are available, we may want to deprecate or remove Aggregators until
we’re ready to figure out what the right API is.


Re: IntervalWindow toString()

2016-09-19 Thread Ben Chambers
I think this is using http://www.mathwords.com/i/interval_notation.htm to
indicate that the interval includes the start time but not the end time.

On Mon, Sep 19, 2016, 8:56 AM Jesse Anderson  wrote:

> The toString() to IntervalWindow starts with a square bracket and ends with
> a parenthesis. Is this a type of notation or a bug? Code:
>
>   @Override
>   public String toString() {
> return "[" + start + ".." + end + ")";
>   }
>
> Thanks,
>
> Jesse
>


Re: Remove legacy import-order?

2016-08-23 Thread Ben Chambers
I think introducing formatting should be a separate discussion.

Regarding the import order: this PR demonstrates the change
https://github.com/apache/incubator-beam/pull/869

I would need to update the second part (applying optimize imports) prior to
actually merging.

On Tue, Aug 23, 2016 at 5:08 PM Eugene Kirpichov
 wrote:

> Two cents: While we're at it, we could consider enforcing formatting as
> well (https://github.com/google/google-java-format). That's a bigger
> change
> though, and I don't think it has checkstyle integration or anything like
> that.
>
> On Tue, Aug 23, 2016 at 4:54 PM Dan Halperin 
> wrote:
>
> > yeah I think that we would be SO MUCH better off if we worked with an
> > out-of-the-box IDE. We don't even distribute an IntelliJ/Eclipse config
> > file right now, and I'd like to not have to.
> >
> > But, ugh, it will mess up ongoing PRs. I guess committers could fix them
> in
> > merge, or we could just make proposers rebase. (Since committers are most
> > proposers, probably little harm in the latter).
> >
> > On Tue, Aug 23, 2016 at 4:11 PM, Jesse Anderson 
> > wrote:
> >
> > > Please. That's the one that always trips me up.
> > >
> > > On Tue, Aug 23, 2016, 4:10 PM Ben Chambers 
> wrote:
> > >
> > > > When Beam was contributed it inherited an import order [1] that was
> > > pretty
> > > > arbitrary. We've added org.apache.beam [2], but continue to use this
> > > > ordering.
> > > >
> > > > Both Eclipse and IntelliJ default to grouping imports into alphabetic
> > > > order. I think it would simplify development if we switched our
> > > checkstyle
> > > > ordering to agree with these IDEs. This also removes special
> treatment
> > > for
> > > > specific packages.
> > > >
> > > > If people agree, I'll send out a PR that changes the checkstyle
> > > > configuration and runs IntelliJ's sort-imports on the existing files.
> > > >
> > > > -- Ben
> > > >
> > > > [1]
> > > > org.apache.beam,com.google,android,com,io,Jama,junit,net,
> > > org,sun,java,javax
> > > > [2] com.google,android,com,io,Jama,junit,net,org,sun,java,javax
> > > >
> > >
> >
>


Remove legacy import-order?

2016-08-23 Thread Ben Chambers
When Beam was contributed it inherited an import order [1] that was pretty
arbitrary. We've added org.apache.beam [2], but continue to use this
ordering.

Both Eclipse and IntelliJ default to grouping imports into alphabetic
order. I think it would simplify development if we switched our checkstyle
ordering to agree with these IDEs. This also removes special treatment for
specific packages.

If people agree, I'll send out a PR that changes the checkstyle
configuration and runs IntelliJ's sort-imports on the existing files.

-- Ben

[1] org.apache.beam,com.google,android,com,io,Jama,junit,net,org,sun,java,javax
[2] com.google,android,com,io,Jama,junit,net,org,sun,java,javax


Re: OldDoFn - CounterSet replacement

2016-08-17 Thread Ben Chambers
Ah, I see. I wasn't aware this was in the context of a new runner.

So -- the new mechanism makes it harder to do the "wrong thing" (not wire
up aggregators). But, since you're goal is to get a baseline without proper
support for aggregators, you should be able to create a NoopAggregator
(that ignores the values) and a NoopAggregatorFactory (that creates
NoopAggregators). That should establish your baseline, while also making it
really clear that aggregators are not supported.

Also note that as part of looking at
https://issues.apache.org/jira/browse/BEAM-147 and
https://issues.apache.org/jira/browse/BEAM-458 aggregators may be getting
simpler to support, so let us know before you spend a lot of time actually
wiring them up.

On Wed, Aug 17, 2016 at 8:43 AM Thomas Weise  wrote:

> Hi Ben,
>
> Thanks for the reply. Here is the PR:
>
> https://github.com/apache/incubator-beam/pull/540
>
> The doFnRunner instantiation in old style is here:
>
>
> https://github.com/apache/incubator-beam/pull/540/files#diff-86746f538c22ebafd06fca17f0d0aa94R116
>
> I should also note that focus of the PR is to establish the Apex runner
> baseline and proper support for aggregators isn't part of it, it's
> something I was planning to take up in subsequent round.
>
> Thomas
>
>
> On Wed, Aug 17, 2016 at 8:14 AM, Ben Chambers 
> wrote:
>
> > Hi Thomas!
> >
> > On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise 
> > wrote:
> >
> > > I'm trying to rebase a PR and adjust for the DoFn changes.
> > >
> >
> > Can you elaborate on what you're trying to do (or send a link to the PR)?
> >
> >
> > > CounterSet is gone and there is now AggregatorFactory and I'm looking
> to
> > > fix an existing usage of org.apache.beam.sdk.util.
> > DoFnRunners.simpleRunner.
> > >
> >
> > In practice, these should act the same. CounterSet was an implementation
> > detail used to create implementation-specific Counters. The DoFnRunner
> was
> > supposed to get the CounterSet that was wired up correctly. Now, the
> > AggregatorFactory serves the role of creating wired-up Aggregators. As
> > before, the DoFnRunner should be instantiated with an AggregatorFactory
> > wired up to appropriately.
> >
> >
> > > Given the instance of OldDoFn, what is the recommended way to obtain
> the
> > > aggregator factory when creating the fn runner?
> > >
> >
> > This should come from the runner. When the runner wants to instantiate a
> > DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
> > will wire up aggregators appropriately.
> >
> >
> > > Thanks!
> > >
> > >
> > > java.lang.NullPointerException
> > > at
> > >
> > > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.
> > createAggregatorInternal(DoFnRunnerBase.java:348)
> > > at
> > >
> > > org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(
> > OldDoFn.java:224)
> > > at
> > >
> > >
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(
> > OldDoFn.java:215)
> > > at
> > >
> > > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.<
> > init>(DoFnRunnerBase.java:214)
> > > at org.apache.beam.sdk.util.DoFnRunnerBase.(
> > DoFnRunnerBase.java:87)
> > > at
> > > org.apache.beam.sdk.util.SimpleDoFnRunner.(
> > SimpleDoFnRunner.java:42)
> > > at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(
> > DoFnRunners.java:60)
> > >
> >
>


Re: OldDoFn - CounterSet replacement

2016-08-17 Thread Ben Chambers
Hi Thomas!

On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise  wrote:

> I'm trying to rebase a PR and adjust for the DoFn changes.
>

Can you elaborate on what you're trying to do (or send a link to the PR)?


> CounterSet is gone and there is now AggregatorFactory and I'm looking to
> fix an existing usage of org.apache.beam.sdk.util.DoFnRunners.simpleRunner.
>

In practice, these should act the same. CounterSet was an implementation
detail used to create implementation-specific Counters. The DoFnRunner was
supposed to get the CounterSet that was wired up correctly. Now, the
AggregatorFactory serves the role of creating wired-up Aggregators. As
before, the DoFnRunner should be instantiated with an AggregatorFactory
wired up to appropriately.


> Given the instance of OldDoFn, what is the recommended way to obtain the
> aggregator factory when creating the fn runner?
>

This should come from the runner. When the runner wants to instantiate a
DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
will wire up aggregators appropriately.


> Thanks!
>
>
> java.lang.NullPointerException
> at
>
> org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.createAggregatorInternal(DoFnRunnerBase.java:348)
> at
>
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(OldDoFn.java:224)
> at
>
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(OldDoFn.java:215)
> at
>
> org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.(DoFnRunnerBase.java:214)
> at org.apache.beam.sdk.util.DoFnRunnerBase.(DoFnRunnerBase.java:87)
> at
> org.apache.beam.sdk.util.SimpleDoFnRunner.(SimpleDoFnRunner.java:42)
> at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(DoFnRunners.java:60)
>


Re: [PROPOSAL] Website page or Jira to host all current proposal discussion and docs

2016-08-07 Thread Ben Chambers
Would we use the same Jira to track the series of PRs implementing the
proposal (if accepted) or would it be discussion only (possibly linked to
the implementation tasks)?

On Sun, Aug 7, 2016, 9:48 PM Frances Perry  wrote:

> I'm a huge fan of keeping all the details related to a topic in a relevant
> jira issue.
>
> On Sun, Aug 7, 2016 at 9:31 PM, Jean-Baptiste Onofré 
> wrote:
>
> > Hi guys,
> >
> > we have now several technical discussions, sent on the mailing list with
> > link to document for details.
> >
> > I think it's not easy for people to follow the different discussions, and
> > to look for the e-mail containing the document links.
> >
> > Of course, it's required to have the discussion on the mailing list (per
> > Apache rules). However, maybe it could be helpful to have a place to find
> > open discussions, with the link to the mailing list discussion thread,
> and
> > to the detailed document.
> > It could be on the website (but maybe not easy to maintain and publish),
> > or on Jira (one Jira per discussion), or a wiki.
> >
> > WDYT ?
> >
> > Regards
> > JB
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>


Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-21 Thread Ben Chambers
This health check seems redundant with just waiting a while and then
checking on the status, other than returning earlier in the case of
reaching a terminal state. What about adding:

/**
 * Returns the state after waiting the specified duration. Will return
earlier if the pipeline
 * reaches a terminal state.
 */
State getStateAfter(Duration duration);

This seems to be a useful building block, both for the user's pipeline (in
case they wanted to build something like wait and then check health) and
also for the SDK (to implement waitUntilFinished, etc.)

On Thu, Jul 21, 2016 at 4:11 PM Pei He  wrote:

> I am not in favor of supporting wait for every states or
> waitUntilState(...).
> One reason is PipelineResult.State is not well defined and is not
> agreed upon runners.
> Another reason is users might not want to wait for a particular state.
> For example,
> waitUntilFinish() is to wait for a terminal state.
> So, even runners have different states, we still can define shared
> properties, such as finished/terminal.
>
> I think when users call waitUntilRunning(), they want to make sure the
> pipeline is up running and is healthy. Maybe we want to wait for at
> least one element went through the pipeline.
>
> What about changing the waitUntilRunning() to the following?
>
> /**
> * Check if the pipeline is health for the duration.
> *
> * Return true if the pipeline is healthy at the end of duration.
> * Return false if the pipeline is not healthy at the end of duration.
> * It may return early if the pipeline is in an unrecoverable failure
> state.
> */
> boolean PipelineResult.healthCheck(Duration duration)
>
> (I think this also addressed Robert's comment about waitToRunning())
>
> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles 
> wrote:
> > Some more comments:
> >
> >  - What are the allowed/expected state transitions prior to RUNNING?
> Today,
> > I presume it is any nonterminal state, so it can be UNKNOWN or STOPPED
> > (which really means "not yet started") prior to RUNNING. Is this what we
> > want?
> >
> >  - If a job can be paused, a transition from RUNNING to STOPPED, then
> > waitUntilPaused(Duration) makes sense.
> >
> >  - Assuming there is some polling under the hood, are runners required to
> > send back a full history of transitions? Or can transitions be missed,
> with
> > only the latest state retrieved?
> >
> >  - If the latter, then does waitUntilRunning() only wait until RUNNING or
> > does it also return when it sees STOPPED, which could certainly indicate
> > that the job transitioned to RUNNING then STOPPED in between polls. In
> that
> > case it is, today, the same as waitUntilStateIsKnown().
> >
> >  - The obvious limit of this discussion is waitUntilState(Duration,
> > Set), which is the same amount of work to implement. Am I correct
> > that everyone in this thread thinks this generality is just not the right
> > thing for a user API?
> >
> >  - This enum could probably use revision. I'd chose some combination of
> > tightening the enum, making it extensible, and make some aspect of it
> > free-form. Not sure where the best balance lies.
> >
> >
> >
> > On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers
>  >> wrote:
> >
> >> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning rather than
> >> waitToRunning which reads oddly)
> >>
> >> The only reason to separate submission from waitUntilRunning would be if
> >> you wanted to kick off several pipelines in quick succession, then wait
> for
> >> them all to be running. For instance:
> >>
> >> PipelineResult p1Future = p1.run();
> >> PipelineResult p2Future = p2.run();
> >> ...
> >>
> >> p1Future.waitUntilRunning();
> >> p2Future.waitUntilRunning();
> >> ...
> >>
> >> In this setup, you can more quickly start several pipelines, but your
> main
> >> program would wait and report any errors before exiting.
> >>
> >> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw
> >>  wrote:
> >>
> >> > I'm in favor of the proposal. My only question is whether we need
> >> > PipelineResult.waitToRunning(), instead I'd propose that run() block
> >> > until the pipeline's running/successfully submitted (or failed). This
> >> > would simplify the API--we'd only have one kind of wait that makes
> >> > sense in all cases.
> >> >
> >> > What kinds of interactions would one want to have with the
> >> > Pipeli

Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-21 Thread Ben Chambers
(Minor Issue: I'd propose waitUntilDone and waitUntilRunning rather than
waitToRunning which reads oddly)

The only reason to separate submission from waitUntilRunning would be if
you wanted to kick off several pipelines in quick succession, then wait for
them all to be running. For instance:

PipelineResult p1Future = p1.run();
PipelineResult p2Future = p2.run();
...

p1Future.waitUntilRunning();
p2Future.waitUntilRunning();
...

In this setup, you can more quickly start several pipelines, but your main
program would wait and report any errors before exiting.

On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw
 wrote:

> I'm in favor of the proposal. My only question is whether we need
> PipelineResult.waitToRunning(), instead I'd propose that run() block
> until the pipeline's running/successfully submitted (or failed). This
> would simplify the API--we'd only have one kind of wait that makes
> sense in all cases.
>
> What kinds of interactions would one want to have with the
> PipelineResults before it's running?
>
> On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh 
> wrote:
> > TestPipeline is probably the one runner that can be expected to block, as
> > certainly JUnit tests and likely other tests will run the Pipeline, and
> > succeed, even if the PipelineRunner throws an exception. Luckily, this
> can
> > be added to TestPipeline.run(), which already has additional behavior
> > associated with it (currently regarding the unwrapping of
> AssertionErrors)
> >
> > On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles  >
> > wrote:
> >
> >> I like this proposal. It makes pipeline.run() seem like a pretty normal
> >> async request, and easy to program with. It removes the implicit
> assumption
> >> in the prior design that main() is pretty much just "build and run a
> >> pipeline".
> >>
> >> The part of this that I care about most is being able to write a program
> >> (not the pipeline, but the program that launches one or more pipelines)
> >> that has reasonable cross-runner behavior.
> >>
> >> One comment:
> >>
> >> On Wed, Jul 20, 2016 at 3:39 PM, Pei He 
> wrote:
> >> >
> >> > 4. PipelineRunner.run() should (but not required) do non-blocking runs
> >> >
> >>
> >> I think we can elaborate on this a little bit. Obviously there might be
> >> "blocking" in terms of, say, an HTTP round-trip to submit the job, but
> >> run() should never be non-terminating.
> >>
> >> For a test runner that finishes the pipeline quickly, I would be fine
> with
> >> run() just executing the pipeline, but the PipelineResult should still
> >> emulate the usual - just always returning a terminal status. It would be
> >> annoying to add waitToFinish() to the end of all our tests, but leaving
> a
> >> run() makes the tests only work with special blocking runner wrappers
> (and
> >> make them poor examples). A JUnit @Rule for test pipeline would hide all
> >> that, perhaps.
> >>
> >>
> >> Kenn
> >>
>


Re: [DISCUSS] PTransform.named vs. named apply

2016-06-23 Thread Ben Chambers
FYI: Created https://issues.apache.org/jira/browse/BEAM-370 to track
changes made in this direction.

On Wed, Jun 22, 2016 at 11:13 PM Aljoscha Krettek 
wrote:

> ±1 for the named apply
>
> On Thu, Jun 23, 2016, 07:07 Robert Bradshaw 
> wrote:
>
> > +1, I think it makes more sense to name the application of a transform
> > rather than the transform itself. (Still mulling on how best to do
> > this with Python...)
> >
> > On Wed, Jun 22, 2016 at 9:27 PM, Jean-Baptiste Onofré 
> > wrote:
> > > +1
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 06/23/2016 12:17 AM, Ben Chambers wrote:
> > >>
> > >> Based on a recent PR (
> https://github.com/apache/incubator-beam/pull/468)
> > I
> > >> was reminded of the confusion around the use of
> > >> .apply(transform.named(someName)) and .apply(someName, transform).
> This
> > is
> > >> one of things I’ve wanted to cleanup for a while. I’d like to propose
> a
> > >> path towards removing this redundancy.
> > >>
> > >> First, some background -- why are there two ways to name things? When
> we
> > >> added support for updating existing pipelines, we needed all
> > applications
> > >> to have unique user-provided names to allow diff’ing the pipelines. We
> > >> found a few problems with the first approach -- using .named() to
> > create a
> > >> new transform -- which led to the introduction of the named apply:
> > >>
> > >> 1. When receiving an error about an application not having a name, it
> is
> > >> not obvious that a name should be given to the *transform*
> > >> 2. When using .named() to construct a new transform either the type
> > >> information is lost or the composite transform has to override
> .named()
> > >>
> > >> We now generally suggest the use of .apply(someName, transform). It is
> > >> easier to use and doesn’t lead to as much confusion around PTransform
> > >> names
> > >> and PTransform application names.
> > >>
> > >> To that end, I'd like to propose the following changes to the code and
> > >> documentation:
> > >> 1. Replace the usage of .named(name) in all examples and composites
> with
> > >> the named-apply syntax.
> > >> 2. Replace .named(name) with a protected PTransform constructor which
> > >> takes
> > >> a default name. If not provided, the default name will be derived from
> > the
> > >> class of the PTransform.
> > >> 3. Use the protected constructor in composites (where appropriate) to
> > >> ensure that the default application has a reasonable name.
> > >>
> > >> Users will benefit from having a single way of naming applications
> while
> > >> building a pipeline. Any breakages due to the removal of .named should
> > be
> > >> easily fixed by either using the named application or by passing the
> > name
> > >> to the constructor of a composite.
> > >>
> > >> I’d like to hear any comments or opinions on this topic from the wider
> > >> community. Please let me know what you think!
> > >>
> > >> -- Ben
> > >>
> > >
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> >
>


[DISCUSS] PTransform.named vs. named apply

2016-06-22 Thread Ben Chambers
Based on a recent PR (https://github.com/apache/incubator-beam/pull/468) I
was reminded of the confusion around the use of
.apply(transform.named(someName)) and .apply(someName, transform). This is
one of things I’ve wanted to cleanup for a while. I’d like to propose a
path towards removing this redundancy.

First, some background -- why are there two ways to name things? When we
added support for updating existing pipelines, we needed all applications
to have unique user-provided names to allow diff’ing the pipelines. We
found a few problems with the first approach -- using .named() to create a
new transform -- which led to the introduction of the named apply:

1. When receiving an error about an application not having a name, it is
not obvious that a name should be given to the *transform*
2. When using .named() to construct a new transform either the type
information is lost or the composite transform has to override .named()

We now generally suggest the use of .apply(someName, transform). It is
easier to use and doesn’t lead to as much confusion around PTransform names
and PTransform application names.

To that end, I'd like to propose the following changes to the code and
documentation:
1. Replace the usage of .named(name) in all examples and composites with
the named-apply syntax.
2. Replace .named(name) with a protected PTransform constructor which takes
a default name. If not provided, the default name will be derived from the
class of the PTransform.
3. Use the protected constructor in composites (where appropriate) to
ensure that the default application has a reasonable name.

Users will benefit from having a single way of naming applications while
building a pipeline. Any breakages due to the removal of .named should be
easily fixed by either using the named application or by passing the name
to the constructor of a composite.

I’d like to hear any comments or opinions on this topic from the wider
community. Please let me know what you think!

-- Ben


Re: [VOTE] Release version 0.1.0-incubating

2016-06-09 Thread Ben Chambers
+1 (binding)

Excited to see the first release underway!

On Thu, Jun 9, 2016 at 8:48 AM Kenneth Knowles 
wrote:

> +1 (binding)
>
> Confirmed that we can run pipelines on Dataflow.
>
> Looks good. Very exciting!
>
>
> On Thu, Jun 9, 2016 at 8:16 AM, Jean-Baptiste Onofré 
> wrote:
>
> > Team work ! Special thanks to Davor and Dan ! And thanks to the entire
> > team: it's a major step forward (the first release is always the hardest
> > one ;)). Let's see how the release will be taken by the IPMC :)
> >
> > Regards
> > JB
> >
> >
> > On 06/09/2016 04:32 PM, Scott Wegner wrote:
> >
> >> +1
> >>
> >> Thanks JB and Davor for all your hard work putting together this
> release!
> >>
> >> On Wed, Jun 8, 2016, 11:02 PM Jean-Baptiste Onofré 
> >> wrote:
> >>
> >> By the way, I forgot to mention that we will create a 0.1.0-incubating
> >>> tag (kind of alias to RC3) when the vote passed.
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 06/09/2016 01:20 AM, Davor Bonaci wrote:
> >>>
>  Hi everyone,
>  Here's the first vote for the first release of Apache Beam -- version
>  0.1.0-incubating!
> 
>  As a reminder, we aren't looking for any specific new functionality,
> but
>  would like to release the existing code, get something to our users'
> 
> >>> hands,
> >>>
>  and test the processes. Previous discussions and iterations on this
> 
> >>> release
> >>>
>  have been archived on the dev@ mailing list.
> 
>  The complete staging area is available for your review, which
> includes:
>  * the official Apache source release to be deployed to
> dist.apache.org
> 
> >>> [1],
> >>>
>  and
>  * all artifacts to be deployed to the Maven Central Repository [2].
> 
>  This corresponds to the tag "v0.1.0-incubating-RC3" in source control,
> 
> >>> [3].
> >>>
> 
>  Please vote as follows:
>  [ ] +1, Approve the release
>  [ ] -1, Do not approve the release (please provide specific comments)
> 
>  For those of us enjoying our first voting experience -- the release
>  checklist is here [4]. This is a "package release"-type of the Apache
>  voting process [5]. As customary, the vote will be open for 72 hours.
> It
> 
> >>> is
> >>>
>  adopted by majority approval with at least 3 PPMC affirmative votes.
> If
>  approved, the proposal will be presented to the Apache Incubator for
> 
> >>> their
> >>>
>  review.
> 
>  Thanks,
>  Davor
> 
>  [1]
> 
> 
> >>>
> https://repository.apache.org/content/repositories/orgapachebeam-1002/org/apache/beam/beam-parent/0.1.0-incubating/beam-parent-0.1.0-incubating-source-release.zip
> >>>
>  [2]
> 
> >>> https://repository.apache.org/content/repositories/orgapachebeam-1002/
> >>>
>  [3]
> https://github.com/apache/incubator-beam/tree/v0.1.0-incubating-RC3
>  [4]
>  http://incubator.apache.org/guides/releasemanagement.html#check-list
>  [5] http://www.apache.org/foundation/voting.html
> 
> 
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbono...@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>


Re: DoFn Reuse

2016-06-08 Thread Ben Chambers
On Wed, Jun 8, 2016 at 10:29 AM Raghu Angadi 
wrote:

> On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers  >
> wrote:
>
> > - If failure occurs after finishBundle() but before the consumption is
> > committed, then the bundle may be reprocessed, which leads to duplicated
> > calls to processElement() and finishBundle().
> >
>
>
>
> > - If failure occurs after consumption is committed but before
> > finishBundle(), then those elements which may have buffered state in the
> > DoFn but not had their side-effects fully processed (since the
> > finishBundle() was responsible for that) are lost.
> >

I am trying to understand this better. Does this mean during
> recovery/replay after a failure, the particular instance of DoFn that
> existed before the worker failure would not be discarded, but might still
> receive elements?  If a DoFn is caching some internal state, it should
> always assume the worker its on might abruptly fail anytime and the state
> would be lost, right?
>

To clarify -- this case is actually not allowed by the beam model. The
guarantee is that either a bundle is successfully completed (startBundle,
processElement*, finishBundle, commit) or not. If it isn't, then the bundle
is reprocessed. So, if a `DoFn` instance builds up any state while
processing a bundle and a failure happens at any point prior to the commit,
it will be retried. Even though the actual state in the first `DoFn` was
lost, the second attempt will build up the same state.


Re: DoFn Reuse

2016-06-08 Thread Ben Chambers
I think there is a difference:

- If failure occurs after finishBundle() but before the consumption is
committed, then the bundle may be reprocessed, which leads to duplicated
calls to processElement() and finishBundle().
- If failure occurs after consumption is committed but before
finishBundle(), then those elements which may have buffered state in the
DoFn but not had their side-effects fully processed (since the
finishBundle() was responsible for that) are lost.



On Wed, Jun 8, 2016 at 10:09 AM Raghu Angadi 
wrote:

> On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi  wrote:
> >
> > I thought finishBundle() exists simply as best-effort indication from the
> > runner to user some chunk of records have been processed..
>
> also to help with DoFn's own clean up if there is any.
>


Re: Where's my PCollection.map()?

2016-05-31 Thread Ben Chambers
Nice post Robert! Thanks for writing up the rationale.

On Fri, May 27, 2016 at 12:38 PM Robert Bradshaw
 wrote:

> Hi all!
>
> One of the questions that often gets asked is why Beam has PTransforms
> for everything instead of having methods on PCollection. This morning
> I published a blog post explaining some of the design considerations
> and history that went into designing the Beam SDK.
>
>
> http://beam.incubator.apache.org/blog/2016/05/27/where-is-my-pcollection-dot-map.html
>
> Happy reading,
> Robert
>


Re: [PROPOSAL] Writing More Expressive Beam Tests

2016-03-31 Thread Ben Chambers
On Mon, Mar 28, 2016 at 4:29 PM Robert Bradshaw 
wrote:

> On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers
>  wrote:
> > My only concern is that in the example, you first need to declare all the
> > inputs, then the pipeline to be tested, then all the outputs. This can
> lead
> > to tests that are hard to follow, since what you're really testing is an
> > interleaving more like "When these inputs arrive, I get this output. Then
> > when this happens, I get that output. Etc.".
>
> +1 to pursuing this direction.
>
> > What if instea of returning a PTransform> we
> had
> > a "TestSource".
>
> I think TestSource is a PTransform>.
>

Maybe? If we want it to easily support multiple inputs, maybe you do
`testSource.getInput(tag)` to get the `PTransform>`
associated with a given tag? But yes, I intended the `TestSource` to be
usable within the pipeline to actually produce the data.

>
> > so we did something like:
> >
> > TestPipeline p = TestPipeline.create();
> > TestSource source = p.testSource();
> >
> > // Set up pipeline reading from source.
> > PCollection sum = ...;
>
> I'm really curious what the "..." looks like. How are we using the source?
>

Either `p.apply(source)` or `p.apply(source.forTag(tag))`. Not sure about
naming, of course.

>
> > BeamAssert sumAssert = BeamAssert.sum();
>
> Did you mean BeamAssert.that(sum)?
>

Almost certainly. Or maybe `BeamAssert.on(sum)`. But something like that.

> // Test for the Speculative Pane
> > source.addElements(...);
> > source.advanceWatermark(...);
> > sumAssert.thatWindowPane(...);
> >
> > // Test for the On Time Pane
> > source.addElements(...)
> > source.advanceWatermark(...);
> > sumAssert.thatWindowPane(...);
> >
> > etc.
>
> Is there a p.run() at the end?
>

Almost certainly.


> > We could also allow TestSource to work with multiple input pipelines like
> > this:
> >
> > TestSource intSource = p.testSource(new
> TypeDescriptor());
> > TestSource longSource = p.testSource(new TypeDescriptor());
> > ...
> > intSource.addElements(...);
> > longSource.addElements(...);
> > etc.
>
> Would we get at total ordering on the addition of elements/advancement
> of watermarks across sources by the temporal ordering of these
> operations in the users program (e.g. by incrementing some global
> counter)?
>

Ideally? I was focusing on the interleaving of inputs/assertions, but we
can talk more about this.


> > On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh 
> > wrote:
> >
> >> Hey everyone;
> >>
> >> I'd still be happy to get feedback. I'm going to start working on this
> >> early next week
> >>
> >> Thanks,
> >>
> >> Thomas
> >>
> >> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh  wrote:
> >>
> >> > Hey everyone,
> >> >
> >> > I've been working on a proposal to expand the capabilities of our
> testing
> >> > API, mostly around writing deterministic tests for pipelines that have
> >> > interesting triggering behavior, especially speculative and late
> >> triggers.
> >> >
> >> > I've shared a doc here
> >> > <
> >>
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing
> >
> >> containing
> >> > the proposal and some examples, with world comment access + explicit
> >> > committer edit access. I'd welcome any feedback you all have.
> >> >
> >> > Thanks,
> >> >
> >> > Thomas
> >> >
> >>
>


Re: [PROPOSAL] Writing More Expressive Beam Tests

2016-03-25 Thread Ben Chambers
My only concern is that in the example, you first need to declare all the
inputs, then the pipeline to be tested, then all the outputs. This can lead
to tests that are hard to follow, since what you're really testing is an
interleaving more like "When these inputs arrive, I get this output. Then
when this happens, I get that output. Etc.".

What if instea of returning a PTransform> we had
a "TestSource". so we did something like:

TestPipeline p = TestPipeline.create();
TestSource source = p.testSource();

// Set up pipeline reading from source.
PCollection sum = ...;
BeamAssert sumAssert = BeamAssert.sum();

// Test for the Speculative Pane
source.addElements(...);
source.advanceWatermark(...);
sumAssert.thatWindowPane(...);

// Test for the On Time Pane
source.addElements(...)
source.advanceWatermark(...);
sumAssert.thatWindowPane(...);

etc.

We could also allow TestSource to work with multiple input pipelines like
this:

TestSource intSource = p.testSource(new TypeDescriptor());
TestSource longSource = p.testSource(new TypeDescriptor());
...
intSource.addElements(...);
longSource.addElements(...);
etc.




On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh 
wrote:

> Hey everyone;
>
> I'd still be happy to get feedback. I'm going to start working on this
> early next week
>
> Thanks,
>
> Thomas
>
> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh  wrote:
>
> > Hey everyone,
> >
> > I've been working on a proposal to expand the capabilities of our testing
> > API, mostly around writing deterministic tests for pipelines that have
> > interesting triggering behavior, especially speculative and late
> triggers.
> >
> > I've shared a doc here
> > <
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing>
> containing
> > the proposal and some examples, with world comment access + explicit
> > committer edit access. I'd welcome any feedback you all have.
> >
> > Thanks,
> >
> > Thomas
> >
>


Re: Draft Contribution Guide

2016-03-23 Thread Ben Chambers
My concern with that is we aren't making clear what constitutes "whenever
possible". Could we more concretely define that (eg., "for example, when
Github is down")? Were there specific cases that you had in mind?
Otherwise, I worry about the ambiguity introduced and the possibility for
different people to interpret that very differently.

On Wed, Mar 23, 2016 at 11:23 AM Maximilian Michels  wrote:

> I didn't see this paragraph before:
>
> "Committers should never commit anything without going through a pull
> request, since that would bypass test coverage and potentially cause
> the build to fail due to checkstyle, etc. Always go through the pull
> request, even if you won’t wait for the code review."
>
> How about:
>
> "Whenever possible, commits should be reviewed in a pull request. Pull
> requests ensure that changes can be communicated properly with the
> community and potential flaws or improvements can be spotted. In
> addition, pull requests ensure proper test coverage and verification
> of the build. Whenever possible, go through the pull request, even if
> you won’t wait for the code review."
>
> - Max
>
>
> On Wed, Mar 23, 2016 at 5:33 PM, Jean-Baptiste Onofré 
> wrote:
> > +1
> >
> > Regards
> > JB
> >
> >
> > On 03/23/2016 05:30 PM, Davor Bonaci wrote:
> >>
> >> Thanks everyone for commenting!
> >>
> >> There were no new comments in the last several days, so we'll start
> moving
> >> the doc over to the Beam website.
> >>
> >> Of course, there's nothing here set in stone -- please reopen the
> >> discussion about any particular point at any time in the future.
> >>
> >> On Fri, Mar 18, 2016 at 4:44 AM, Maximilian Michels 
> >> wrote:
> >>
> >>> Hi Frances,
> >>>
> >>> Very nice comprehensive guide. I'll leave some comments in the doc.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On Fri, Mar 18, 2016 at 11:51 AM, Sandeep Deshmukh
> >>>  wrote:
> 
>  The document captures the process very well and has right amount of
> >>>
> >>> details
> 
>  for newbies too.
> 
>  Great work!!!
> 
>  Regards,
>  Sandeep
> 
>  On Fri, Mar 18, 2016 at 10:46 AM, Siva Kalagarla <
> >>>
> >>> siva.kalaga...@gmail.com>
> 
>  wrote:
> 
> > Thanks Frances,  This document is helpful for newbies like myself.
> > Will
> > follow these steps over this weekend.
> >
> > On Thu, Mar 17, 2016 at 2:19 PM, Frances Perry
> 
> > wrote:
> >
> >> Hi Beamers!
> >>
> >> We've started a draft
> >> <
> >>
> >
> >>>
> >>>
> https://docs.google.com/document/d/1syFyfqIsGOYDE_Hn3ZkRd8a6ylcc64Kud9YtrGHgU0E/comment
> >>>
> >>>
> >> for the Beam contribution guide. Please take a look and provide
> >>>
> >>> feedback.
> >>
> >> Once things settle, we'll get this moved over on to the Beam
> website.
> >>
> >> Frances
> >>
> >
> >
> >
> > --
> >
> >
> > Regards,
> > Siva Kalagarla
> > @SivaKalagarla 
> >
> >>>
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>


Re: Renaming process: first step Maven coordonates

2016-03-21 Thread Ben Chambers
I don't think Maven will recognize 0.1.0-incubating-SNAPSHOT as a snapshot.
It will recognize it as 0.1.0 with the "incubating-SNAPSHOT" qualifier.

For instance, looking at the code for parsing qualifiers, it only handles
the string "SNAPSHOT" specially, not "incubating-SNAPSHOT".
http://maven.apache.org/ref/3.0.4/maven-artifact/xref/org/apache/maven/artifact/versioning/ComparableVersion.html#52

Looking at this Stack Overflow answer (
http://stackoverflow.com/a/31482463/4539304) it looks like support was
improved in Maven 3.2.4 to allow multiple qualifiers (its still unclear
whether incubating would be considered by the code as a qualifier).

Either way, we shouldn't expect users to upgrade to Maven 3.2.4 or newer
just to get reasonable version number treatment. It seems like sticking
with the standard "-SNAPSHOT" and "" for releases is preferable.

If the goal is to get incubating into the file names, I think we can
configure the Maven build process to do so. For instance, finalName
defaults to
"${project.artifactId}-${project.version}". If we
changed that to
"${project.artifactId}-incubating-${project.version}"
it seems like we'd "incubating" in the file names without needing to
complicate the release numbering.

On Mon, Mar 21, 2016 at 10:24 AM Jean-Baptiste Onofré 
wrote:

> Hi Ben,
>
> 1. True for Python, but it can go in a folder in sdk (sdk/python)
> anyway. I think the DSLs (Java based) and other languages that we might
> introduce (Scala, ...) can be the same.
>
> 2. The incubating has to be in the released filenames. So it can be in
> the version or name. Anyway, my proposal was 0.1.0-incubating-SNAPSHOT
> for a SNAPSHOT and 0.1.0-incubating for a release (it's what I did in
> the PR). Like this, the Maven standards are still valid.
>
> Regards
> JB
>
> On 03/21/2016 06:20 PM, Ben Chambers wrote:
> > 1. Regarding "java" as a module -- are we sure that other languages will
> be
> > packaged using Maven as well? For instance, Python has its own ecosystem
> > which likely doesn't play well with Python.
> >
> > 2. Using the literal "SNAPSHOT" as the qualifier has special meaning
> Maven
> > -- it is newer than all other qualified releases, but older than any
> > unqualified release. It feels like we should take advantage of this,
> which
> > makes our versioning more consistent with Maven standards. Specifically,
> > snapshots should be 0.1.0-SNAPSHOT and releases should be 0.1.0.
> >  0.1.0-SNAPSHOT because that uses the standard definition of SNAPSHOT
> >  0.1.0 because if we had any qualifier than the 0.1.0-SNAPSHOT would
> be
> > considered newer
> >
> > Davor's suggestion of putting the "incubating" in the name or description
> > of the artifacts seems like a preferable option.
> >
> > On Mon, Mar 21, 2016 at 7:33 AM Jean-Baptiste Onofré 
> > wrote:
> >
> >> Hi beamers,
> >>
> >> I updated the PR according to your comments.
> >>
> >> I have couple of points I want to discuss:
> >>
> >> 1. All modules use the same groupId (org.apache.beam). In order to have
> >> a cleaner structure on the Maven repo, I wonder if it's not better to
> >> have different groupId depending of the artifacts. For instance,
> >> org.apache.beam.sdk, containing a module with java as artifactId (it
> >> will contain new artifacts with id python, scala, ...),
> >> org.apache.beam.runners containing modules with flink and spark as
> >> artifactId, etc. Thoughts ?
> >> 2. The version has been set to 0.1.0-incubating-SNAPSHOT for all
> >> artifacts, including the runners. It doesn't mean that the runners will
> >> have to use the same version as parent (they can have their own release
> >> cycle). However, as we "bootstrap" the project, I used the same version
> >> in all modules.
> >>
> >> Now, I'm starting two new commits:
> >> - renaming of the packages
> >> - folders re-organization
> >>
> >> Thanks !
> >> Regards
> >> JB
> >>
> >> On 03/21/2016 01:56 PM, Jean-Baptiste Onofré wrote:
> >>> Hi Davor,
> >>>
> >>> thank you so much for your comments. I'm updating the PR according to
> >>> your PR (and will provide explanation to some changes).
> >>>
> >>> Thanks dude !
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 03/21/2016 06:29 AM, Davor Bonaci wrote:
> >>>> I left a few comments on PR #

Re: Renaming process: first step Maven coordonates

2016-03-21 Thread Ben Chambers
1. Regarding "java" as a module -- are we sure that other languages will be
packaged using Maven as well? For instance, Python has its own ecosystem
which likely doesn't play well with Python.

2. Using the literal "SNAPSHOT" as the qualifier has special meaning Maven
-- it is newer than all other qualified releases, but older than any
unqualified release. It feels like we should take advantage of this, which
makes our versioning more consistent with Maven standards. Specifically,
snapshots should be 0.1.0-SNAPSHOT and releases should be 0.1.0.
0.1.0-SNAPSHOT because that uses the standard definition of SNAPSHOT
0.1.0 because if we had any qualifier than the 0.1.0-SNAPSHOT would be
considered newer

Davor's suggestion of putting the "incubating" in the name or description
of the artifacts seems like a preferable option.

On Mon, Mar 21, 2016 at 7:33 AM Jean-Baptiste Onofré 
wrote:

> Hi beamers,
>
> I updated the PR according to your comments.
>
> I have couple of points I want to discuss:
>
> 1. All modules use the same groupId (org.apache.beam). In order to have
> a cleaner structure on the Maven repo, I wonder if it's not better to
> have different groupId depending of the artifacts. For instance,
> org.apache.beam.sdk, containing a module with java as artifactId (it
> will contain new artifacts with id python, scala, ...),
> org.apache.beam.runners containing modules with flink and spark as
> artifactId, etc. Thoughts ?
> 2. The version has been set to 0.1.0-incubating-SNAPSHOT for all
> artifacts, including the runners. It doesn't mean that the runners will
> have to use the same version as parent (they can have their own release
> cycle). However, as we "bootstrap" the project, I used the same version
> in all modules.
>
> Now, I'm starting two new commits:
> - renaming of the packages
> - folders re-organization
>
> Thanks !
> Regards
> JB
>
> On 03/21/2016 01:56 PM, Jean-Baptiste Onofré wrote:
> > Hi Davor,
> >
> > thank you so much for your comments. I'm updating the PR according to
> > your PR (and will provide explanation to some changes).
> >
> > Thanks dude !
> >
> > Regards
> > JB
> >
> > On 03/21/2016 06:29 AM, Davor Bonaci wrote:
> >> I left a few comments on PR #46.
> >>
> >> Thanks JB for doing this; a clear improvement.
> >>
> >> On Mon, Mar 14, 2016 at 6:04 PM, Jean-Baptiste Onofré 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I started the renaming process from Dataflow to Beam.
> >>>
> >>> I submitted a first PR about the Maven coordinates:
> >>>
> >>> https://github.com/apache/incubator-beam/pull/46
> >>>
> >>> I will start the packages renaming (updating the same PR). For the
> >>> directories structure, I would like to talk with Frances, Dan, Tyler,
> >>> and
> >>> Davor first.
> >>>
> >>> Regards
> >>> JB
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbono...@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Design document for Static Display Data

2016-03-15 Thread Ben Chambers
Hi JB.

I attempted to clarify in the proposal as well, but this is focused on
static information -- details that are known during pipeline construction
but would also be useful for display.

For example, if you used the Top.perKey(n) transform all the execution of
the pipeline needs to know is that it is running some serialized CombineFn,
and the code does the rest. But it would be really useful if we could
display that the code being executed is Top, and that it was configured
with "n = 10".
The SDK already features aggregators (~ counters) for monitoring tasks.
While there are potential improvements to the aggregator API and
functionality, they are outside the scope of this proposal.

Let me know if that doesn't clarify things. And thanks for taking a look!
Ben

On Mon, Mar 14, 2016, 5:59 PM Jean-Baptiste Onofré  wrote:

> Hi Ben,
>
> thanks for the update.
>
> Correct me if I'm wrong: you are proposing kind of monitoring of the
> pipelines, and be able to trace what's going on during the execution of
> the pipeline.
> It's a very important feature, especially for stream (and data
> integration).
>
> The document greatly describes the data display. Do you have any plan to
> implement kind of "checkpoint"/alerting depending of some predicates on
> the data (it's something that I had in mind for the Beam data
> integration DSL) ? It's maybe the TRIGGER type ?
>
> Thanks again, great document and idea.
>
> Regards
> JB
>
> On 03/15/2016 01:25 AM, Ben Chambers wrote:
> > Hi!
> >
> > The following document describes work that we're planning on doing to
> allow
> > every steps in a pipeline to include more information about what is
> > actually going on. The goal is to allow UIs and diagnostic tools to
> display
> > details about what is happening inside each step by including details
> what
> > is otherwise just serialized blobs of code.
> >
> > Everyone should be able to comment on the following link:
> >
> https://docs.google.com/document/d/11enEB9JwVp6vO0uOYYTMYTGkr3TdNfELwWqoiUg5ZxM/edit?usp=sharing
> >
> > We will be creating a Jira issue to track the implementation of the
> > associated SDK changes.
> >
> > Thanks,
> > Ben
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Design document for Static Display Data

2016-03-14 Thread Ben Chambers
Hi!

The following document describes work that we're planning on doing to allow
every steps in a pipeline to include more information about what is
actually going on. The goal is to allow UIs and diagnostic tools to display
details about what is happening inside each step by including details what
is otherwise just serialized blobs of code.

Everyone should be able to comment on the following link:
https://docs.google.com/document/d/11enEB9JwVp6vO0uOYYTMYTGkr3TdNfELwWqoiUg5ZxM/edit?usp=sharing

We will be creating a Jira issue to track the implementation of the
associated SDK changes.

Thanks,
Ben


Re: Confusing about the bouded naming of PubsubIO

2016-02-18 Thread Ben Chambers
Classes named "Bound" are used throughout the sdk to describe builders that
are specified enough to be applied. It indicates that the required
parameters have been bound. It is not related to whether the output
PCollection is bounded or unbounded.

On Thu, Feb 18, 2016, 7:42 AM bakey pan  wrote:

> Hi,all:
> I notice that in in the PubSubIO class, there only exist one Bound
> static class inherit from PTransform.
> But actually in the apply method of Bound,whether return a bounded or
> unbounded PCollection is depend on the variables maxNumRecords and
> maxReadTime.So why not name this class "MixBound" or else.
>  I think that is a little bit confuse name it  "Bound" but actually it
> can be unbouned data stream
>
> --
>  Best Regards
>BakeyPan
>