Re: [VOTE] Release 0.4.0-incubating, release candidate #1

2016-12-15 Thread Eugene Kirpichov
There is one more data-loss type error, a fix for which should go into the
release.
https://github.com/apache/incubator-beam/pull/1620

On Thu, Dec 15, 2016 at 10:42 AM Davor Bonaci  wrote:

> I think we should build another RC.
>
> Two issues:
> * Metrics issue that JB pointed out earlier. It seems to cause a somewhat
> poor user experience for every pipeline executed on the Direct runner.
> (Thanks JB for finding this out!)
> * Failure of testSideInputsWithMultipleWindows in Jenkins [1].
>
> Both issues seem easy, trivial, non-risky fixes that are already committed
> to master. I'd suggest just taking them.
>
> Davor
>
> [1]
>
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_RunnableOnService_Dataflow/1819/
>
> On Thu, Dec 15, 2016 at 8:45 AM, Ismaël Mejía  wrote:
>
> > +1 (non-binding)
> >
> > - verified signatures + checksums
> > - run mvn clean verify -Prelease, all artifacts+tests run smoothly
> >
> > The release artifacts are signed with the key with fingerprint 8F0D334F
> > https://dist.apache.org/repos/dist/release/incubator/beam/KEYS
> >
> > I just created a JIRA to add the signer/KEYS information in the release
> > template, I will do a PR for this later on.
> >
> > Ismaël
> >
> > On Thu, Dec 15, 2016 at 2:26 PM, Jean-Baptiste Onofré 
> > wrote:
> >
> > > Hi Amit,
> > >
> > > thanks for the update.
> > >
> > > As you changed the Jira, the Release Notes are now up to date.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 12/15/2016 02:20 PM, Amit Sela wrote:
> > >
> > >> I see three problems in the release notes (related to Spark runner):
> > >>
> > >> Improvement:
> > >> 
> > >> [BEAM-757] - The SparkRunner should utilize the SDK's DoFnRunner
> instead
> > >> of
> > >> writing it's own.
> > >> 
> > >> [BEAM-807] - [SparkRunner] Replace OldDoFn with DoFn
> > >> 
> > >> [BEAM-855] - Remove the need for --streaming option in the spark
> runner
> > >>
> > >> BEAM-855 is duplicate and probably shouldn't have had a Fix Version.
> > >>
> > >> The other two are not a part of this release - I was probably too
> eager
> > to
> > >> mark them fixed after merge and I accidentally put 0.4.0 as the Fix
> > >> Version.
> > >>
> > >> I made the changes in JIRA now.
> > >>
> > >> Thanks,
> > >> Amit
> > >>
> > >> On Thu, Dec 15, 2016 at 3:09 PM Jean-Baptiste Onofré  >
> > >> wrote:
> > >>
> > >> Reviewing and testing the release, I see:
> > >>>
> > >>> 16/12/15 14:04:47 ERROR MetricsContainer: Unable to update metrics on
> > >>> the current thread. Most likely caused by using metrics outside the
> > >>> managed work-execution thread.
> > >>>
> > >>> It doesn't block the execution of the pipeline, but basically, it
> means
> > >>> that metrics don't work anymore.
> > >>>
> > >>> I'm investigating.
> > >>>
> > >>> Regards
> > >>> JB
> > >>>
> > >>> On 12/15/2016 01:46 PM, Jean-Baptiste Onofré wrote:
> > >>>
> >  Hi everyone,
> > 
> >  Please review and vote on the release candidate #1 for the version
> >  0.4.0-incubating, as follows:
> >  [ ] +1, Approve the release
> >  [ ] -1, Do not approve the release (please provide specific
> comments)
> > 
> >  The complete staging area is available for your review, which
> > includes:
> >  * JIRA release notes [1],
> >  * the official Apache source release to be deployed to
> > dist.apache.org
> > 
> > >>> [2],
> > >>>
> >  * all artifacts to be deployed to the Maven Central Repository [3],
> >  * source code tag "v0.4.0-incubating-RC1" [4],
> >  * website pull request listing the release and publishing the API
> > 
> > >>> reference
> > >>>
> >  manual [5].
> > 
> >  The vote will be open for at least 72 hours. It is adopted by
> majority
> >  approval, with at least 3 PPMC affirmative votes.
> > 
> >  Thanks,
> >  Regards
> >  JB
> > 
> >  [1]
> > 
> >  https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje
> > >>> ctId=12319527=12338590
> > >>>
> > 
> >  [2]
> > 
> > >>> https://dist.apache.org/repos/dist/dev/incubator/beam/0.4.0-
> > incubating/
> > >>>
> >  [3]
> > 
> > >>>
> https://repository.apache.org/content/repositories/orgapachebeam-1006/
> > >>>
> >  [4]
> > 
> >  https://git-wip-us.apache.org/repos/asf?p=incubator-beam.git
> > >>> ;a=tag;h=85d1c8a2f85bbc667c90f55ff0eb27de5c2446a6
> > >>>
> > 
> >  [5] https://github.com/apache/incubator-beam-site/pull/109
> > 
> > >>>
> > >>> --
> > >>> Jean-Baptiste Onofré
> > >>> jbono...@apache.org
> > >>> http://blog.nanthrax.net
> > >>> Talend - http://www.talend.com
> > >>>
> > >>>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>


Re: New DoFn and WindowedValue/WinowingInternals

2016-12-11 Thread Eugene Kirpichov
Hi Amit,

Yes, this is correct. Part of the motivation for this is that DoFn API is
user-facing, and the compressed representation of windowed elements (e.g.
access to all windows of an element), as well as the ability to emit
directly into a specified window, is an implementation detail of the runner
that is dangerous to expose to SDK users (even I got burnt by it while
working on SplittableParDo), so we would like to move WindowedValue into
runners-core and keep the semantically clean API in the SDK: access to the
current window, and assigning windows via Window.into().

On Sun, Dec 11, 2016 at 11:59 AM Kenneth Knowles <k...@google.com.invalid>
wrote:

> You've got it right. My recommendations is to just directly implement it
> for the Spark runner. It will often actually clean things up a bit. Here's
> the analogous change for the Flink runner:
> https://github.com/apache/incubator-beam/pull/1435/files.
>
> With GABW, I tried going through the process of keeping some utility
> expansion in runners-core, making StateInternalsFactory, refactoring
> GroupAlsoByWindowsDoFn, then GroupByKeyViaGroupByKeyOnly,
> GroupAlsoByWindow. But it ended up simpler for each runner to just not use
> most of that and do it directly. (they all still share GABW but none of the
> surrounding bits, IIRC)
>
> On Sun, Dec 11, 2016 at 10:33 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
> > So basically using new DoFn with *SplittableParDo*, *Window.Bound*, and
> > *GroupAlsoByWindow* requires a custom implementation by per runner as
> they
> > are not handled by DoFn anymore, right ?
> >
> > On Sun, Dec 11, 2016 at 3:42 PM Eugene Kirpichov
> > <kirpic...@google.com.invalid> wrote:
> >
> > > Hi Amit, I'll comment in more detail later, but meanwhile please take a
> > > look at https://github.com/apache/incubator-beam/pull/1565
> > > There is a small amount of relevant changes to spark runner.
> > > Take a look at implementation of SplittableParDo (already committed) in
> > > particular ProcessFn and it's usage in direct runner - this is exactly
> > what
> > > you're looking for, a new DoFn that with per-runner support is able to
> > emit
> > > multi-windowed values.
> > > On Sun, Dec 11, 2016 at 4:28 AM Amit Sela <amitsel...@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I've been working on migrating the Spark runner to new DoFn and I've
> > > > stumbled upon a couple of cases where OldDoFn is used in a way that
> > > > accessed windowInternals (outputWindowedValue) such as
> > AssignWindowsDoFn.
> > > >
> > > > Since changing windows is no longer the responsibility of DoFn I was
> > > > wondering who and how is this done.
> > > >
> > > > Thanks,
> > > > Amit
> > > >
> > >
> >
>


Re: New DoFn and WindowedValue/WinowingInternals

2016-12-11 Thread Eugene Kirpichov
Hi Amit, I'll comment in more detail later, but meanwhile please take a
look at https://github.com/apache/incubator-beam/pull/1565
There is a small amount of relevant changes to spark runner.
Take a look at implementation of SplittableParDo (already committed) in
particular ProcessFn and it's usage in direct runner - this is exactly what
you're looking for, a new DoFn that with per-runner support is able to emit
multi-windowed values.
On Sun, Dec 11, 2016 at 4:28 AM Amit Sela  wrote:

> Hi all,
>
> I've been working on migrating the Spark runner to new DoFn and I've
> stumbled upon a couple of cases where OldDoFn is used in a way that
> accessed windowInternals (outputWindowedValue) such as AssignWindowsDoFn.
>
> Since changing windows is no longer the responsibility of DoFn I was
> wondering who and how is this done.
>
> Thanks,
> Amit
>


Re: Naming and API for executing shell commands

2016-12-07 Thread Eugene Kirpichov
I think it makes sense as a separate module, since I'd hesitate to call it
an "IO" because importing and exporting data is not the main thing about
executing shell commands.

Let's continue discussion of the API per se: how can we design an API to
encompass all these use cases? WDYT?

On Wed, Dec 7, 2016 at 10:01 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Eugene,
>
> I like your ShellCommands.execute().withCommand("foo") !
>
> And you listed valid points and usages, especially around the
> input/output of the command.
>
> My question is where do we put such ShellCommands extension ? As a
> module under IO ? As a new extensions module ?
>
> Regards
> JB
>
> On 12/07/2016 06:24 PM, Eugene Kirpichov wrote:
> > Branched off into a separate thread.
> >
> > How about ShellCommands.execute().withCommand("foo")? This is what it is
> -
> > it executes shell commands :)
> >
> > Say, if I want to just execute a command for the sake of its side effect,
> > but I'm not interested in its output - it would feel odd to describe that
> > as either "reading" from the command or "writing" to it. Likewise, when I
> > execute commands in bash, I'm not thinking of it as reading or writing to
> > them.
> >
> > Though, there are various modes of interaction with shell commands; some
> of
> > them could be called "reading" or "writing" I guess - or both!
> > - The command itself can be specified at pipeline construction time, or
> > fully dynamic (elements of a PCollection are themselves commands), or be
> > constructed from a fixed command and a variable set of arguments coming
> > from the PCollection (one-by-one or xargs-style, cramming as many
> arguments
> > as fit into the command line limit).
> > - We may also be writing elements of the PCollection to standard input of
> > the command - one-by-one, or in arbitrarily sized batches.
> > - We may be reading the command's stdout, its stderr, and its error code.
> >
> > I think these options call for a more flexible naming and set of APIs
> than
> > read and write. And more flexible than a single DoFn, too (which is
> > something I hadn't thought of before - this connector definitely has room
> > for doing some interesting things).
> >
> > On Wed, Dec 7, 2016 at 1:32 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >
> >> By the way, just to elaborate a bit why I provided as an IO:
> >>
> >> 1. From an user experience perspective, I think we have to provide
> >> convenient way to write pipeline. Any syntax simplifying this is
> valuable.
> >> I think it's easier to write:
> >>
> >> pipeline.apply(ExecIO.read().withCommand("foo"))
> >>
> >> than:
> >>
> >> pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());
> >>
> >> 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for
> >> "connector": reading/writing from/to a data source. So, even without the
> >> IO "wrapping" (by wrapping, I mean the Read and Write), I think Exec
> >> extension should be in IO as it's a source/write of data.
> >>
> >> Regards
> >> JB
> >>
> >> On 12/07/2016 08:37 AM, Robert Bradshaw wrote:
> >>> I don't mean to derail the tricky environment questions, but I'm not
> >>> seeing why this is bundled as an IO rather than a plain DoFn (which
> >>> can be applied to a PCollection of one or more commands, yielding
> >>> their outputs). Especially for the case of a Read, which in this case
> >>> is not splittable (initially or dynamically) and always produces a
> >>> single element--feels much more like a Map to me.
> >>>
> >>> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
> >>> <kirpic...@google.com.invalid> wrote:
> >>>> Ben - the issues of "things aren't hung, there is a shell command
> >> running",
> >>>> aren't they general to all DoFn's? i.e. I don't see why the runner
> would
> >>>> need to know that a shell command is running, but not that, say, a
> heavy
> >>>> monolithic computation is running. What's the benefit to the runner in
> >>>> knowing that the DoFn contains a shell command?
> >>>>
> >>>> By saying "making sure that all shell commands finish", I suppose
> you're
> >>>> referring to the possibility of leaks if the user initiates a shell
> &

Re: [DISCUSS] ExecIO

2016-12-07 Thread Eugene Kirpichov
(discussion continues on a thread called "Naming and API for executing
shell commands")

On Wed, Dec 7, 2016 at 1:32 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

> By the way, just to elaborate a bit why I provided as an IO:
>
> 1. From an user experience perspective, I think we have to provide
> convenient way to write pipeline. Any syntax simplifying this is valuable.
> I think it's easier to write:
>
> pipeline.apply(ExecIO.read().withCommand("foo"))
>
> than:
>
> pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());
>
> 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for
> "connector": reading/writing from/to a data source. So, even without the
> IO "wrapping" (by wrapping, I mean the Read and Write), I think Exec
> extension should be in IO as it's a source/write of data.
>
> Regards
> JB
>
> On 12/07/2016 08:37 AM, Robert Bradshaw wrote:
> > I don't mean to derail the tricky environment questions, but I'm not
> > seeing why this is bundled as an IO rather than a plain DoFn (which
> > can be applied to a PCollection of one or more commands, yielding
> > their outputs). Especially for the case of a Read, which in this case
> > is not splittable (initially or dynamically) and always produces a
> > single element--feels much more like a Map to me.
> >
> > On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
> > <kirpic...@google.com.invalid> wrote:
> >> Ben - the issues of "things aren't hung, there is a shell command
> running",
> >> aren't they general to all DoFn's? i.e. I don't see why the runner would
> >> need to know that a shell command is running, but not that, say, a heavy
> >> monolithic computation is running. What's the benefit to the runner in
> >> knowing that the DoFn contains a shell command?
> >>
> >> By saying "making sure that all shell commands finish", I suppose you're
> >> referring to the possibility of leaks if the user initiates a shell
> command
> >> and forgets to wait for it? I think that should be solvable again
> without
> >> Beam intervention, by making a utility class for running shell commands
> >> which implements AutoCloseable, and document that you have to use it
> that
> >> way.
> >>
> >> Ken - I think the question here is: are we ok with a situation where the
> >> runner doesn't check or care whether the shell command can run, and the
> >> user accepts this risk and studies what commands will be available on
> the
> >> worker environment provided by the runner they use in production, before
> >> productionizing a pipeline with those commands.
> >>
> >> Upon some thought I think it's ok. Of course, this carries an obligation
> >> for runners to document their worker environment and its changes across
> >> versions. Though for many runners such documentation may be trivial:
> >> "whatever your YARN cluster has, the runner doesn't change it in any
> way"
> >> and it may be good enough for users. And for other runners, like
> Dataflow,
> >> such documentation may also be trivial: "no guarantees whatsoever, only
> >> what you stage in --filesToStage is available".
> >>
> >> I can also see Beam develop to a point where we'd want all runners to be
> >> able to run your DoFn in a user-specified Docker container, and manage
> >> those intelligently - but I think that's quite a while away and it
> doesn't
> >> have to block work on a utility for executing shell commands. Though
> it'd
> >> be nice if the utility was forward-compatible with that future world.
> >>
> >> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >>
> >>> Hi Eugene,
> >>>
> >>> thanks for the extended questions.
> >>>
> >>> I think we have two levels of expectations here:
> >>> - end-user responsibility
> >>> - worker/runner responsibility
> >>>
> >>> 1/ From a end-user perspective, the end-user has to know that using a
> >>> system command (via ExecIO) and more generally speaking anything which
> >>> relay on worker resources (for instance a local filesystem directory
> >>> available only on a worker) can fail if the expected resource is not
> >>> present on all workers. So, basically, all workers should have the same
> >>> topology. It's what I'm assuming for the PR.
> >>> For example, I have my Spark clus

Re: [DISCUSS] ExecIO

2016-12-06 Thread Eugene Kirpichov
Ben - the issues of "things aren't hung, there is a shell command running",
aren't they general to all DoFn's? i.e. I don't see why the runner would
need to know that a shell command is running, but not that, say, a heavy
monolithic computation is running. What's the benefit to the runner in
knowing that the DoFn contains a shell command?

By saying "making sure that all shell commands finish", I suppose you're
referring to the possibility of leaks if the user initiates a shell command
and forgets to wait for it? I think that should be solvable again without
Beam intervention, by making a utility class for running shell commands
which implements AutoCloseable, and document that you have to use it that
way.

Ken - I think the question here is: are we ok with a situation where the
runner doesn't check or care whether the shell command can run, and the
user accepts this risk and studies what commands will be available on the
worker environment provided by the runner they use in production, before
productionizing a pipeline with those commands.

Upon some thought I think it's ok. Of course, this carries an obligation
for runners to document their worker environment and its changes across
versions. Though for many runners such documentation may be trivial:
"whatever your YARN cluster has, the runner doesn't change it in any way"
and it may be good enough for users. And for other runners, like Dataflow,
such documentation may also be trivial: "no guarantees whatsoever, only
what you stage in --filesToStage is available".

I can also see Beam develop to a point where we'd want all runners to be
able to run your DoFn in a user-specified Docker container, and manage
those intelligently - but I think that's quite a while away and it doesn't
have to block work on a utility for executing shell commands. Though it'd
be nice if the utility was forward-compatible with that future world.

On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

> Hi Eugene,
>
> thanks for the extended questions.
>
> I think we have two levels of expectations here:
> - end-user responsibility
> - worker/runner responsibility
>
> 1/ From a end-user perspective, the end-user has to know that using a
> system command (via ExecIO) and more generally speaking anything which
> relay on worker resources (for instance a local filesystem directory
> available only on a worker) can fail if the expected resource is not
> present on all workers. So, basically, all workers should have the same
> topology. It's what I'm assuming for the PR.
> For example, I have my Spark cluster, using the same Mesos/Docker setup,
> then the user knows that all nodes in the cluster will have the same
> setup and so resources (it could be provided by DevOps for instance).
> On the other hand, running on Dataflow is different because I don't
> "control" the nodes (bootstrapping or resources), but in that case, the
> user knows it (he knows the runner he's using).
>
> 2/ As you said, we can expect that runner can deal with some
> requirements (expressed depending of the pipeline and the runner), and
> the runner can know the workers which provide capabilities matching
> those requirements.
> Then, the end user is not more responsible: the runner will try to
> define if the pipeline can be executed, and where a DoFn has to be run
> (on which worker).
>
> For me, it's two different levels where 2 is smarter but 1 can also make
> sense.
>
> WDYT ?
>
> Regards
> JB
>
> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:
> > Hi JB,
> >
> > Thanks for bringing this to the mailing list. I also think that this is
> > useful in general (and that use cases for Beam are more than just classic
> > bigdata), and that there are interesting questions here at different
> levels
> > about how to do it right.
> >
> > I suggest to start with the highest-level question [and discuss the
> > particular API only after agreeing on this, possibly in a separate
> thread]:
> > how to deal with the fact that Beam gives no guarantees about the
> > environment on workers, e.g. which commands are available, which shell or
> > even OS is being used, etc. Particularly:
> >
> > - Obviously different runners will have a different environment, e.g.
> > Dataflow workers are not going to have Hadoop commands available because
> > they are not running on a Hadoop cluster. So, pipelines and transforms
> > developed using this connector will be necessarily non-portable between
> > different runners. Maybe this is ok? But we need to give users a clear
> > expectation about this. How do we phrase this expectation and where do we
> > put it in the docs?
> >
> > - I'm concerned that this puts

Re: [DISCUSS] ExecIO

2016-12-05 Thread Eugene Kirpichov
Hi JB,

Thanks for bringing this to the mailing list. I also think that this is
useful in general (and that use cases for Beam are more than just classic
bigdata), and that there are interesting questions here at different levels
about how to do it right.

I suggest to start with the highest-level question [and discuss the
particular API only after agreeing on this, possibly in a separate thread]:
how to deal with the fact that Beam gives no guarantees about the
environment on workers, e.g. which commands are available, which shell or
even OS is being used, etc. Particularly:

- Obviously different runners will have a different environment, e.g.
Dataflow workers are not going to have Hadoop commands available because
they are not running on a Hadoop cluster. So, pipelines and transforms
developed using this connector will be necessarily non-portable between
different runners. Maybe this is ok? But we need to give users a clear
expectation about this. How do we phrase this expectation and where do we
put it in the docs?

- I'm concerned that this puts additional compatibility requirements on
runners - it becomes necessary for a runner to document the environment of
its workers (OS, shell, privileges, guaranteed-installed packages, access
to other things on the host machine e.g. whether or not the worker runs in
its own container, etc.) and to keep it stable - otherwise transforms and
pipelines with this connector will be non-portable between runner versions
either.

Another way to deal with this is to give up and say "the environment on the
workers is outside the scope of Beam; consult your runner's documentation
or use your best judgment as to what the environment will be, and use this
at your own risk".

What do others think?

On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré  wrote:

Hi beamers,

Today, Beam is mainly focused on data processing.
Since the beginning of the project, we are discussing about extending
the use cases coverage via DSLs and extensions (like for machine
learning), or via IO.

Especially for the IO, we can see Beam use for data integration and data
ingestion.

In this area, I'm proposing a first IO: ExecIO:

https://issues.apache.org/jira/browse/BEAM-1059
https://github.com/apache/incubator-beam/pull/1451

Actually, this IO is mainly an ExecFn that executes system commands
(again, keep in mind we are discussing about data integration/ingestion
and not data processing).

For convenience, this ExecFn is wrapped in Read and Write (as a regular IO).

Clearly, this IO/Fn depends of the worker where it runs. But it's under
the user responsibility.

During the review, Eugene and I discussed about:
- is it an IO or just a fn ?
- is it OK to have worker specific IO ?

IMHO, an IO makes lot of sense to me and it's very convenient for end
users. They can do something like:

PCollection output =
pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));

The pipeline will execute myscript and the output pipeline will contain
command execution std out/err.

On the other hand, they can do:

pcollection.apply(ExecIO.write());

where PCollection contains the commands to execute.

Generally speaking, end users can call ExecFn wherever they want in the
pipeline steps:

PCollection output = pipeline.apply(ParDo.of(new ExecIO.ExecFn()));

The input collection contains the commands to execute, and the output
collection contains the commands execution result std out/err.

Generally speaking, I'm preparing several IOs more on the data
integration/ingestion area than on "pure" classic big data processing. I
think it would give a new "dimension" to Beam.

Thoughts ?

Regards
JB
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Questions about coders

2016-11-30 Thread Eugene Kirpichov
Thanks Kenn! This is very helpful.

On Wed, Nov 30, 2016 at 4:13 PM Kenneth Knowles <k...@google.com.invalid>
wrote:

> On Wed, Nov 30, 2016 at 3:52 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Hello,
> >
> > Do we have anywhere a set of recommendations for developing new coders?
> I'm
> > confused by a couple of things:
> >
> > - Why are coders serialized by JSON serialization instead of by regular
> > Java serialization, unlike other elements of the pipeline such as DoFn's
> > and transforms?
> >
>
> Big picture: a Coder is actually an agreed-upon binary format that should
> be cross-language when possible. It (the coder itself) needs to be able to
> be deserialized/implemented by any SDK. Alluded to a bit in
> https://s.apache.org/beam-runner-api but not developed at length. There is
> also some fragmentary bits moving in this direction (the encodingId which
> should be a URN in Beam).
>
> It is still very convenient to use Java serialization which is why we
> provided CustomCoder, but this presents a portability barrier. We have
> options here: We could remove it, or we could have language-specific coders
> that cannot be used for PCollections consumed by diverse languages. Maybe
> there are other good options.
>
>
>
> > - Which should one inherit from: CustomCoder, AtomicCoder,
> StandardCoder? I
> > looked at their direct subclasses and didn't see a clear distinction.
> Seems
> > like, when the encoded type is not parameterized then it's CustomCoder,
> and
> > when it's parameterized then it's StandardCoder? [but then again
> > CustomCoder isolates you from JSON weirdness, and this seems useful for
> > non-atomic coders too]
> >
>
> Within the SDK, one should not inherit from CustomCoder. It is bulky and
> non-portable. If a coder has component coders then it should inherit from
> StandardCoder. If not, then AtomicCoder adds convenience.
>
> These APIs are not perfect. For coder inference, the component coders are
> really expected to correspond to type variables. For example, List has a
> component coder that is a Coder. But for the purposes of construction
> and update-compatibility, component coders should include *all* coders that
> can be passed in that would modify the overall binary format, whether or
> not they correspond to a type variable, for example
> FullWindowedValueCoder accepts not just a Coder but also a
> Coder.
>
> There is work to be done here to support both uses of "component" coders,
> probably by separating them entirely.
>
>
> - Which methods are necessary to implement? E.g. should I implement
> > verifyDeterministic? Should I implement the "byte size observer" methods?
> >
>
> You should implement verifyDeterministic if your coder is deterministic so
> it can be used for th ekeys in GroupByKey.
>
>
> I'm actually even more confused by the hierarchy between Coder =>
> > StandardCoder => DeterministicStandardCoder => AtomicCoder =>
> CustomCoder.
> > DeterministicStandardCoder implements verifyDeterministic(), but it has
> > subclasses that override this method...
> >
>
> This is broken. It is for backwards compatibility reasons, I believe.
> Certainly DeterministicStandardCoder is a bit questionable, as it ignores
> components, and also overriding it so that subclasses cannot safely be used
> as DeterministicStandardCoder is wrong.
>
> Kenn
>


Re: PCollection to PCollection Conversion

2016-11-29 Thread Eugene Kirpichov
Hi JB,
Depending on the scope of what you want to ultimately accomplish with this
extension, I think it may make sense to write a proposal document and
discuss it.
If it's just a collection of utility DoFn's for various well-defined
source/target format pairs, then that's probably not needed, but if it's
anything more, then I think it is.
That will help avoid a lot of churn if people propose reasonable
significant changes.

On Tue, Nov 29, 2016 at 11:15 AM Jean-Baptiste Onofré 
wrote:

> By the way Jesse, I gonna push my DATAFORMAT branch on my github and I
> will post on the dev mailing list when done.
>
> Regards
> JB
>
> On 11/29/2016 07:01 PM, Jesse Anderson wrote:
> > I want to bring this thread back up since we've had time to think about
> it
> > more and make a plan.
> >
> > I think a format-specific converter will be more time consuming task than
> > we originally thought. It'd have to be a writer that takes another writer
> > as a parameter.
> >
> > I think a string converter can be done as a simple transform.
> >
> > I think we should start with a simple string converter and plan for a
> > format-specific writer.
> >
> > What are your thoughts?
> >
> > Thanks,
> >
> > Jesse
> >
> > On Thu, Nov 10, 2016 at 10:33 AM Jesse Anderson 
> > wrote:
> >
> > I was thinking about what the outputs would look like last night. I
> > realized that more complex formats like JSON and XML may or may not
> output
> > the data in a valid format.
> >
> > Doing a direct conversion on unbounded collections would work just fine.
> > They're self-contained. For writing out bounded collections, that's where
> > we'll hit the issues. This changes the uber conversion transform into a
> > transform that needs to be a writer.
> >
> > If a transform executes a JSON conversion on a per element basis, we'd
> get
> > this:
> > {
> > "key": "value"
> > }, {
> > "key": "value"
> > },
> >
> > That isn't valid JSON.
> >
> > The conversion transform would need to know do several things when
> writing
> > out a file. It would need to add brackets for an array. Now we have:
> > [
> > {
> > "key": "value"
> > }, {
> > "key": "value"
> > },
> > ]
> >
> > We still don't have valid JSON. We have to remove the last comma or have
> > the uber transform start putting in the commas, except for the last
> element.
> >
> > [
> > {
> > "key": "value"
> > }, {
> > "key": "value"
> > }
> > ]
> >
> > Only by doing this do we have valid JSON.
> >
> > I'd argue we'd have a similar issue with XML. Some parsers require a root
> > element for everything. The uber transform would have to put the root
> > element tags at the beginning and end of the file.
> >
> > On Wed, Nov 9, 2016 at 11:36 PM Manu Zhang 
> wrote:
> >
> > I would love to see a lean core and abundant Transforms at the same time.
> >
> > Maybe we can look at what Confluent 
> does
> > for kafka-connect. They have official extensions support for JDBC, HDFS
> and
> > ElasticSearch under https://github.com/confluentinc. They put them along
> > with other community extensions on
> > https://www.confluent.io/product/connectors/ for visibility.
> >
> > Although not a commercial company, can we have a GitHub user like
> > beam-community to host projects we build around beam but not suitable for
> > https://github.com/apache/incubator-beam. In the future, we may have
> > beam-algebra like http://github.com/twitter/algebird for algebra
> operations
> > and beam-ml / beam-dl for machine learning / deep learning. Also, there
> > will will be beam related projects elsewhere maintained by other
> > communities. We can put all of them on the beam-website or like spark
> > packages as mentioned by Amit.
> >
> > My $0.02
> > Manu
> >
> >
> >
> > On Thu, Nov 10, 2016 at 2:59 AM Kenneth Knowles 
> > wrote:
> >
> >> On this point from Amit and Ismaël, I agree: we could benefit from a
> place
> >> for miscellaneous non-core helper transformations.
> >>
> >> We have sdks/java/extensions but it is organized as separate artifacts.
> I
> >> think that is fine, considering the nature of Join and SortValues. But
> for
> >> simpler transforms, Importing one artifact per tiny transform is too
> much
> >> overhead. It also seems unlikely that we will have enough commonality
> > among
> >> the transforms to call the artifact anything other than [some synonym
> for]
> >> "miscellaneous".
> >>
> >> I wouldn't want to take this too far - even though the SDK many
> > transforms*
> >> that are not required for the model [1], I like that the SDK artifact
> has
> >> everything a user might need in their "getting started" phase of use.
> This
> >> user-friendliness (the user doesn't care that ParDo is core and Sum is
> > not)
> >> plus the difficulty of judging which transforms go where, are probably
> why
> >> we have them mostly all in one place.
> >>
> >> Models to look at, off the top of my head, 

Re: Flink runner. Wrapper for DoFn

2016-11-18 Thread Eugene Kirpichov
Hi Alexey,

In general, things like establishing connections and initializing caches
are better done in @Setup and @TearDown methods, rather than @StartBundle
and @FinishBundle, because DoFn's can be reused between bundles and this
way you get more benefit from reuse.

Bundles can be pretty small, especially in streaming pipelines. That said,
they normally shouldn't be 1-element-small. Hopefully someone working on
the Flink runner can comment.

On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
 wrote:

> Hmmm...Thanks...This could very well be my bottleneck since I see tons of
> threads get on WAIT state after sometime& stay like that relatively
> forever.I have a 100 G worth of elements to process...Is there a
> way to bypass this "startBundle" & get a fairly optimized
> behavior?Anyone? Thanks+regardsAmir-
>
>   From: Demin Alexey 
>  To: dev@beam.incubator.apache.org; amir bahmanyari 
>  Sent: Friday, November 18, 2016 10:40 AM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Very simple example:
>
> My DoFn on startBundle load filters from remote db and build optimized
> index, on processElement apply filters on every element for decision about
> push element to next operation or drop his.
>
> In current implementation it's like matching regexp on string, you have 2
> way
> 1) compile regexp every time for every element
> 2) compile regexp one time and apply on all element
>
> now flink work by 1 way and this way not optimal
>
>
> 2016-11-18 22:26 GMT+04:00 amir bahmanyari :
>
> > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > "expensive" as per each element pls?
> > Thanks
> >
> >  From: Demin Alexey 
> >  To: dev@beam.incubator.apache.org
> >  Sent: Friday, November 18, 2016 7:40 AM
> >  Subject: Flink runner. Wrapper for DoFn
> >
> > Hi
> >
> > In flink runner we have this code:
> >
> > https://github.com/apache/incubator-beam/blob/master/
> > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> >
> > but in mostly cases method startBundle can be expensive for making for
> > every element (for example connection for db/build cache/ etc)
> >
> > Why so important invoke startBundle/finishBundle on every
> > incoming streamRecord ?
> >
> > Thanks
> > Alexey Diomin
> >
> >
> >
> >
>
>
>


Re: Placement of temporary files by FileBasedSink

2016-10-27 Thread Eugene Kirpichov
I don't think your assessment of behavior of glob patterns correct, per
https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames#directory-by-directory-vs-recursive-wildcards
 .
I believe (and hope) that behavior of IOChannelFactory.match() matches the
behavior of gsutil.

On Thu, Oct 27, 2016 at 1:48 PM Chamikara Jayalath <chamik...@apache.org>
wrote:

> BTW I'm in favor of using a sub-directory and possibly asking users to
> update their glob pattern while also allowing users to optionally specify a
> temporary path in the future, as you propose.
>
> Thanks,
> Cham
>
> On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath <chamik...@apache.org>
> wrote:
>
> > On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov
> > <kirpic...@google.com.invalid> wrote:
> >
> > Getting back to this. I noticed that the original user's job mentioned in
> >
> >
> http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded
> > is
> > configured to write to /path/to/$date/foo-x-of-y and another job
> > then reads from /path/to/$date/*, so sibling files won't work - it's
> > necessary to put temp files either into a subdirectory, or in a location
> > completely outside /path/to/$date/.
> >
> >
> > I think, at least for GCS, glob pattern '/path/to/$date/*' will include
> > files that are within any immediate sub-directory '/path/to/$date/uuid/'.
> > So unless users use the pattern '/path/to/$date/foo*' they could run into
> > the same issue.
> >
> > Thanks,
> > Cham
> >
> >
> >
> > By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*),
> > then a subdirectory won't help; and if a user has another job that reads
> > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a
> > date, and they have a job that reads all data for all dates), then a
> > sibling directory won't help either.
> >
> > I think these two cases are good motivation for allowing the user to
> > provide a specific temp directory, as a last resort.
> >
> > To sum up:
> > - in order to solve the user's problem, we need to use a directory
> > - in the future we'll need to allow users to configure the temp directory
> > on FileBasedSink.
> >
> > The current PR takes the "directory sibling to the write path" approach,
> > and I don't see a better option that would address the needs of most
> users
> > automatically.
> >
> > Dan - you mentioned on the PR that you would prefer a subdirectory to a
> > sibling directory, but this *is* a subdirectory (specified write path is
> > /path/to/$date/foo-x-of-y and the suggested temp path is
> > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the
> directory
> > to which the sink is writing).
> >
> > Any alternatives / objections to proceeding with the approach in the PR
> > as-is?
> >
> > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles <k...@google.com.invalid>
> > wrote:
> >
> > > @Eugene, we can make breaking changes. But if we really don't want to,
> we
> > > can add it under a new name easily. That particular inheritance
> hierarchy
> > > is not precious IMO.
> > >
> > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov
> > <kirpic...@google.com.invalid
> > > >
> > > wrote:
> > >
> > > > @Cham - this addresses temporary files that were written by
> successful
> > > > bundles, but not by failed bundles (and not the case when the entire
> > > > pipeline fails), so it's not sufficient.
> > > >
> > > > @Dan - yes, there are situations when it's impossible to create a
> > > sibling.
> > > > In that case, we'd need a fallback - either something the user needs
> to
> > > > explicitly specify ("your path is such that we don't know where to
> > place
> > > > temporary files, please specify withTempLocation or something"), or I
> > > like
> > > > Robert's option of using sibling but differently-named files in this
> > > case.
> > > >
> > > > @Kenn - yeah, a directory-based format would be great
> > > > (/path/to/foo/x-of-y), but this would be a breaking change to
> > the
> > > > expected behavior.
> > > >
> > > > I actually really like the option of sibling-but-differently-named
> > files
> > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive
> change
> > > 

Re: [DISCUSS] Using Verbs for Transforms

2016-10-24 Thread Eugene Kirpichov
$0.02: Deduplicate? (lends to extensions like Deduplicate.by(some key
extractor function))

On Mon, Oct 24, 2016 at 1:22 PM Dan Halperin 
wrote:

> I find "MakeDistinct" more confusing. My votes in decreasing preference:
>
> 1. Keep `RemoveDuplicates` name, ensure that important keywords are in the
> Javadoc. This reduces churn on our users and is honestly pretty dang
>  descriptive.
> 2. Rename to `Distinct`, which is clear if you're a SQL user and likely
> less clear otherwise. This is a backwards-incompatible API change, so we
> should do it before we go stable.
>
> I am not super strong that 1 > 2, but I am very strong that "Distinct" >>>
> "MakeDistinct" or and "RemoveDuplicates" >>> "AvoidDuplicate".
>
> Dan
>
> On Mon, Oct 24, 2016 at 10:12 AM, Kenneth Knowles 
> wrote:
>
> > The precedent that we use verbs has many exceptions. We have
> > ApproximateQuantiles, Values, Keys, WithTimestamps, and I would even
> > include Sum (at least when I read it).
> >
> > Historical note: the predilection towards verbs is from the Google Style
> > Guide for Java method names
> >  >,
> > which states "Method names are typically verbs or verb phrases". But even
> > in Google code there are lots of exceptions when it makes sense, like
> > Guava's
> > Iterables.any(), Iterables.all(), Iterables.toArray(), the entire
> > Predicates module, etc. Just an aside; Beam isn't Google code. I suggest
> we
> > use our judgment rather than a policy.
> >
> > I think "Distinct" is one of those exceptions. It is a standard
> widespread
> > name and also reads better as an adjective. I prefer it, but also don't
> > care strongly enough to change it or to change it back :-)
> >
> > If we must have a verb, I like it as-is more than MakeDistinct and
> > AvoidDuplicate.
> >
> > On Mon, Oct 24, 2016 at 9:46 AM Jesse Anderson 
> > wrote:
> >
> > > My original thought for this change was that Crunch uses the class name
> > > Distinct. SQL also uses the keyword distinct.
> > >
> > > Maybe the rule should be changed to adjectives or verbs depending on
> the
> > > context.
> > >
> > > Using a verb to describe this class really doesn't connote what the
> class
> > > does as succinctly as the adjective.
> > >
> > > On Mon, Oct 24, 2016 at 9:40 AM Neelesh Salian 
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > First of all, thank you to Daniel, Robert and Jesse for their review
> on
> > > > this: https://issues.apache.org/jira/browse/BEAM-239
> > > >
> > > > A point that came up was using verbs explicitly for Transforms.
> > > > Here is the PR: https://github.com/apache/incubator-beam/pull/1164
> > > >
> > > > Posting it to help understand if we have a consensus for it and if
> yes,
> > > we
> > > > could perhaps document it for future changes.
> > > >
> > > > Thank you.
> > > >
> > > > --
> > > > Neelesh Srinivas Salian
> > > > Engineer
> > > >
> > >
> >
>


Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Eugene Kirpichov
@Cham - this addresses temporary files that were written by successful
bundles, but not by failed bundles (and not the case when the entire
pipeline fails), so it's not sufficient.

@Dan - yes, there are situations when it's impossible to create a sibling.
In that case, we'd need a fallback - either something the user needs to
explicitly specify ("your path is such that we don't know where to place
temporary files, please specify withTempLocation or something"), or I like
Robert's option of using sibling but differently-named files in this case.

@Kenn - yeah, a directory-based format would be great
(/path/to/foo/x-of-y), but this would be a breaking change to the
expected behavior.

I actually really like the option of sibling-but-differently-named files
(/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to
the current (/path/to/foo-temp-$uid) and indeed would not involve creating
new directories or needing new IOChannelFactory APIs. It will still match a
glob like /path/to/* though (which a user could conceivably specify in a
situation like gs://my-logs-bucket/*), but it might be good enough.


On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
<rober...@google.com.invalid> wrote:

> On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
> > I like the spirit of proposal #1 for addressing the critical duplication
> > problem, though as Dan points out the logic to choose a related but
> > collision-free name might be slightly more complex.
> >
> > It is a nice bonus that it addresses the less critical issues and
> improves
> > usability for manual inspections and interventions.
> >
> > The term "sibling" is being slightly misused here. I'd say #1 as proposed
> > is a "sibling of the parent" while today's behavior is "sibling". I'd
> say a
> > root cause of multiple problems is that our sharded file format is "a
> bunch
> > of files next to each other" and the sibling is "other files in the same
> > directory" so it takes some care, and explicit file name tracking instead
> > of globbing, to work with it correctly.
> >
> >  AFAIK (corrections welcome) there is nothing special about
> > Write.to("s3://bucket/file") meaning write to
> > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems
> > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with
> > the convention that this prefix is fully owned by this file. Now the
> prefix
> > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and
> > more glob and UI friendly. (any non "-" character would work for GCS and
> > S3, but the "/" convention is better, considering the broader world)
> >
> > And bringing it back to this thread, the "sibling" is no longer "more
> files
> > in the same directory" now "s3://bucket/file-temp-$uid" which is on the
> > same filesystem with the same ACLs. It is also more UI friendly, easier
> to
> > clean up, and does more to explicitly indicate that this is really one
> > sharded file. Perhaps there's a pitfall I am overlooking?
>
> Using directories rather than prefixes is a big change, and introduces
> complications like dealing with hidden dot files (some placed
> implicitly by the system or applications, and worrying about
> executable bits rather than just the rw ones and possibly more
> complicated permission inheritance).
>
> > Also since you mentioned local file support, FWIW the cleanup glob
> "file-*"
> > today breaks on Windows due to Java library vagaries, while "file/*"
> would
> > succeed.
> > On Thu, Oct 20, 2016, 09:14 Dan Halperin <dhalp...@google.com.invalid>
> > wrote:
> >
> > This thread is conflating many issues.
> >
> > * Putting temp files where they will not match the glob for the desired
> > output files
> > * Dealing with eventually-consistent filesystems (s3, GCS, ...)
> > * Properly cleaning up all temp files
> >
> > They all need to get solved, but for now I think we only need to solve
> the
> > first one.
> >
> > Siblings fundamentally will not work. Consider the following
> > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling
> would
> > be a new bucket, so not guaranteed to exist.
> >
> > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <
> chamik...@apache.org>
> > wrote:
> >
> >> Can this be prevented by moving temporary files (copy + delete
> >> individually) at finalization instead of

Placement of temporary files by FileBasedSink

2016-10-19 Thread Eugene Kirpichov
Hello,

This is a continuation of the discussion on PR
https://github.com/apache/incubator-beam/pull/1050 which turned out more
complex than expected.

Short summary:
Currently FileBasedSink, when writing to /path/to/foo (in practice,
/path/to/foo-x-of-y where y is the total number of output
files), puts temporary files into /path/to/foo-temp-$uid, and when
finalizing the sink, it removes the temporary files by matching the pattern
/path/to/foo-temp-* and removing everything that matches.

There are a couple of issues with this:
- FileBasedSink uses IOChannelFactory, which currently supports local
filesystems and Google Cloud Storage (GCS). GCS's match() operation is
currently eventually consistent. So, it may fail to return some of the
files, so we won't remove them.
- If the Beam job is cancelled or fails midway, then the temp files won't
be deleted at all (that's subject to a separate discussion on cleanup API -
AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
and was going to file one).
- If a follow-up data processing job is reading /path/to/foo, then the way
temp files are named, they will likely match the same glob pattern (e.g.
"/path/to/foo*") as the one intending to match the final output in
/path/to/foo, so if some temp files are leftover, the follow-up job will
effectively read duplicate records (some from /path/to/foo, some from
/path/to/foo-temp-$blah).

I think, in the absence of a way to guarantee that all temp files will be
deleted (I think it'd be very difficult or impossible to provide a hard
guarantee of this, considering various possible failure conditions such as
zombie workers), the cleanest way to solve this is put temp files in a
location that's unlikely to match the same glob pattern as one that matches
the final output.

Some options for what that could be:
1. A subdirectory that is a sibling of the final path, sufficiently unique,
and unlikely to match the same glob -
/path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
currently takes)
2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
because PipelineOptions.tempLocation might be on a different filesystem, or
have different ACLs, than the output of the FileBasedSink.
3. A subdirectory that the user *must* explicitly provide on their
FileBasedSink. This is a reduction in usability, but there may be cases
when this is necessary - e.g. if the final location of the FileBasedSink is
such that we can't create siblings to it (e.g. the root path in a GCS
bucket - gs://some-bucket/)
4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
directory for the given final path") which would do one of the above -
reasonable, and simplifies FileBasedSink, but we still need to choose which
of #1-#3 this call should do.

There might be other things I missed. There might be radical restructurings
of FileBasedSink that work around this problem entirely, though I couldn't
think of any.

In general, the requirements on the solution are:
- It should be very unlikely that somebody reads the temp files in the same
glob pattern as the final output by mistake.
- It should continue to make sense as IOChannelFactory is extended with
support for other filesystems.
- It should ideally use the same filesystem as the final output, or perhaps
even a location logically "close" to the final output, so that it could
potentially take advantage of that filesystem's efficient bulk-copy or
bulk-rename operations if available.
- It should be easy to manually clean up the temp files if something went
wrong and they weren't cleaned up by the Beam job.

I'm personally in favor of #1 with fallback to #2 or #3, because I think a
sibling directory achieves all of these requirements unless a sibling
directory can't be created.

Thoughts?


Re: Specifying type arguments for generic PTransform builders

2016-10-13 Thread Eugene Kirpichov
I think the choice between #1 or #3 is a red herring - the cases where #3
is a better choice than #1 are few and far between, and probably not at all
controversial (e.g. ParDo). So I suggest we drop this part of the
discussion.

Looks like the main contenders for the complex case are #1 (Foo.blah())
vs. #4 (Foo.Unbound and Foo.Bound).

Dan, can you clarify again what you mean by this:
"a utility function that gives you a database reader ready-to-go ... but
independent of the type you want the result to end up as. You can't do
that if you must bind the type first."

I think this is quite doable with #1:

class CompanyDefaults {
public static  DatabaseIO.Read defaultDatabaseIO() { return
DatabaseIO.create().withSettings(blah).withCredentials(blah); }
}

DatabaseIO.Read source =
CompanyDefaults.defaultDatabaseIO().withQuery(blah);

All in all, it seems to me that #1 (combined with potentially encapsulating
parts of the configuration into separate objects, such as
JdbcConnectionParameters in JdbcIO) is the only option that can do
everything fairly well, its only downside is having to specify the type,
and it is very easy to implement when you're implementing your own
transform - which, I agree with Kenn, matters a lot too.

I think that coming up with an easy-to-implement, universally applicable
pattern matters a lot also because the Beam ecosystem is open, and the set
of connectors/transforms available to users will not always be as carefully
curated and reviewed as it is currently - the argument "the implementation
complexity doesn't matter because the user doesn't see it" will not apply.
So, ultimately, "are there a lot of good-quality connectors available to
Beam users" will be equivalent to "is it easy to develop a good-quality
connector". And the consistency between APIs provided by different
connectors will matter for the user experience, too.

On Thu, Oct 13, 2016 at 7:09 PM Kenneth Knowles <k...@google.com.invalid>
wrote:

> On Thu, Oct 13, 2016 at 4:55 PM Dan Halperin <dhalp...@google.com.invalid>
> wrote:
> > These
> > suggestions are motivated by making things easier on transform writers,
> but
> > IMO we need to be optimizing for transform users.
>
> To be fair to Eugene, he was actually analyzing real code patterns that
> exists in Beam today, not suggesting new ones.
>
> Along those lines, your addition of the BigTableIO pattern is well-taken
> and my own analysis of that code is #5: "when you don't have a type
> variable to bind, leave every field blank and validate later. Also, have an
> XYZOptions object". I believe in the presence of type parameters this
> reduces to #4 Bound/Unbound classes but it is more palatable in the
> no-type-variable case. It is also a good choice when varying subsets of
> parameters might be optional - the Window transform matches this pattern
> for good reason.
>
> The other major drawback of #3 is the inability to provide generic
> > configuration. For example, a utility function that gives you a database
> > reader ready-to-go with all the default credentials and options you need
> --
> > but independent of the type you want the result to end up as. You can't
> do
> > that if you must bind the type first.
> >
>
> This is a compelling use case. It is valuable for configuration to be a
> first-class object that can be passed around. BigTableOptions is a good
> example. It isn't in contradiction with #3, but actually fits very nicely.
>
> By definition for this default configuration to be first-class it has to be
> more than an invalid intermediate state of a PTransform's builder methods.
> Concretely, it would be BigTableIO.defaultOptions(), which would make
> manifest the inaccessible default options that could be implied by
> BigTableIO.read(). There can sometimes be a pretty fine line between a
> builder and an options object, to be sure. You might distinguish it by
> whether you would conceivably use the object elsewhere - and for
> BigTableOptions the answer is certainly "yes" since it actually is an
> external class. In the extreme, every method takes one giant POJO and that
> sucks.
>
>
> > I think that in general all of these patterns are significantly worse in
> > the long run than the existing standards, e.g., for BigtableIO.
>
>
> Note that BigTableIO.read() is actually not "ready-to-go" but has nulls and
> empty strings that cause crashes if they are not overridden. It is just a
> builder without the concluding "build()" method (for the record: I find
> concluding "build()" methods pointless, too :-)
>
> One of the better examples of the pattern of "ready-to-go" builders -
> though not a transform - is WindowingStrategy (props to Ben), where t

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-10-12 Thread Eugene Kirpichov
Hey all,

An update: https://github.com/apache/incubator-beam/pull/896 has been
merged, laying groundwork and adding support for splittable DoFn to the
in-memory runner.

What this PR does:
- It defines an API, in full accordance with the proposal discussed on this
thread.
- It adds a mostly runner-agnostic expansion of the ParDo transform for a
splittable DoFn, with one runner-specific primitive transform that needs to
be overridden by every runner.
- It overrides said transform in the in-memory runner, so this works
end-to-end in the in-memory runner.
- All this code is covered by tests (unit and integration
@RunnableOnService) and appears to work properly in combination with the
rest of the Beam model: e.g., inputs to a splittable DoFn can be windowed,
and their windows and timestamps are transparently propagated.

Caveats:
- The API is marked @Experimental, but this is an understatement: it is
assumed to be in flux and is not intended to be used yet. Overwhelmingly
likely, it *will* change in incompatible ways. DO NOT write pipelines with
this transform yet.
- It only works in the in-memory runner: the vast majority of code is
runner-agnostic, but a central runner-specific primitive transform is only
overridden by the in-memory runner.

My immediate next plan is to make this work in the Cloud Dataflow streaming
runner (since this is the runner I'm most familiar with), in order to get
experience with what kind of runner hooks are needed and to put the API in
shape for adding hooks for other runners - and then work either myself or
with the community on making it work in other runners too. Once all runners
sufficiently support a particular subset of features, we can start
transitioning some connectors or writing new ones using that subset (I
expect that streaming connectors will come first).

Additionally, the Python SDK is considering using Splittable DoFn as the
*only* API for streaming sources (right now it doesn't have any API for
that, so there's no compatibility concerns). No implementation work has
happened yet, but it seems like a good idea.

On Tue, Aug 30, 2016 at 1:45 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Thanks for the explanation Eugene and JB.
>
> By the way, I'm not trying to find holes in this, I really like the
> feature. I just sometimes wonder how a specific thing might be implemented
> with this.
>
> On Mon, 29 Aug 2016 at 18:00 Eugene Kirpichov <kirpic...@google.com.invalid
> >
> wrote:
>
> > Hi Aljoscha,
> >
> > The watermark reporting is done via
> > ProcessContinuation.futureOutputWatermark, at the granularity of
> returning
> > from individual processElement() calls - you return from the call and
> give
> > a watermark on your future output. We assume that updating watermark is
> > sufficient at a per-bundle level (or, if not, then that you can make
> > bundles small enough) cause that's the same level at which state changes,
> > timers etc. are committed.
> > It can be implemented by setting a per-key watermark hold and updating it
> > when each call for this element returns. That's the way it is implemented
> > in my current prototype
> https://github.com/apache/incubator-beam/pull/896
> > (see
> > SplittableParDo.ProcessFn)
> >
> > On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Hi,
> > > I have another question about this: currently, unbounded sources have
> > > special logic for determining the watermark and the system periodically
> > > asks the sources for the current watermark. As I understood it,
> > watermarks
> > > are only "generated" at the sources. How will this work when sources
> are
> > > implemented as a combination of DoFns and SplittableDoFns? Will
> > > SplittableDoFns be asked for a watermark, does this mean that
> watermarks
> > > can then be "generated" at any operation?
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov
> > <kirpic...@google.com.invalid
> > > >
> > > wrote:
> > >
> > > > Hi JB,
> > > >
> > > > Yes, I'm assuming you're referring to the "magic" part on the
> transform
> > > > expansion diagram. This is indeed runner-specific, and timers+state
> are
> > > > likely the simplest way to do this for an SDF that does unbounded
> > amount
> > > of
> > > > work.
> > > >
> > > > On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > > > wrote:
> > > >
> > > > > Anyway, from a runner perspective

Re: Introducing a Redistribute transform

2016-10-12 Thread Eugene Kirpichov
. suppose
that BarFn is much slower to compute than FooFn, but the results of FooFn
need to be sent to pubsub as quickly as possible. In that case we don't
want to wait for both FooFn and BarFn on a particular element to complete
before sending the result of FooFn to pubsub.

It seems like a Redistribute transform should be the answer to this as well:

PCollection foos =
ints.apply(Redistribute.create()).apply(ParDo.of(new FooFn()))
PCollection bars =
ints.apply(Redistribute.create()).apply(ParDo.of(new BarFn()))
PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic));

Here, it would cause FooFn and BarFn to be applied to logically independent
collections, whose contents are equivalent to "ints".

***

The current Reshuffle transform happens to do all 3 at the same time, and
does it in a potentially non-portable way: in particular it relies on the
fact that GroupByKey provides all 3 semantics, but the Beam model does not
require this (and in fact, in Spark it probably wouldn't provide #1). It is
also potentially non-optimal: there can exist better implementations of
each of these 3 semantics not involving a global group-by-key.

It's not clear yet what to do about all this. Thoughts welcome.

On Tue, Oct 11, 2016 at 11:35 AM Kenneth Knowles <k...@google.com.invalid>
wrote:

> On Tue, Oct 11, 2016 at 10:56 AM Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
>
> > Yeah, I'm starting to lean towards removing Redistribute.byKey() from the
> > public API - because it only makes sense for getting access to per-key
> > state, and 1) we don't have it yet and 2) runner should insert it
> > automatically - so there's no use case for it.
>
>
> +1 to removing Redistribute.byKey() from the public API.
>
>
> > The "checkpointing keys" use
> > case should be done via Redistribute.arbitrarily(), I believe.
> >
>
> Actually I think it really does have to be a GroupByKey followed by writing
> the groups without breaking them up:
>
>  - With GBK each element must appear in exactly one output group, so you
> have to checkpoint or be able to retract groupings (nice easy explanation
> from Thomas Groh; any faults in my paraphrase are my own).
>
>  - But GBK followed by "output the elements one by one" actually removes
> this property. Now you can replace the whole thing with a no-op and fuse
> with downstream and still get exactly once processing according to the
> model but not as observed via side effects to external systems. So sinks
> should really be doing that, and I'll retract this use case for
> Redistribute.
>
> As for Redistribute.arbitrarily():
> > In a batch runner, we could describe it as "identity transform, but
> runner
> > is required to process the resulting PCollection with downstream
> transforms
> > as well as if it had been created from elements via Create.of(), in terms
> > of ability to parallelize processing and minimize amount of re-executed
> > work in case of failures" (which is a fancy way of saying "materialize"
> > without really saying "materialize" :) ).
> >
>
> How can you observe if the runner ignored you?
>


Re: Introducing a Redistribute transform

2016-10-10 Thread Eugene Kirpichov
Hi Amit,

The transform, the way it's implemented, actually does several things at
the same time and that's why it's tricky to document it.

Redistribute.arbitrarily():
- Introduces a fusion barrier (in runners that have it), making sure that
the runner can fully parallelize processing the output PCollection with
DoFn's
- Introduces a fault-tolerance barrier, effectively "checkpointing" the
input PCollection (again, in runners where it makes sense) and making sure
that processing elements of the output PCollection with a DoFn, if the DoFn
fails, will redo only that processing, but not need to recompute the input
PCollection.

Redistribute.byKey():
- All of the above and also makes the collection "key-partitioned", giving
access to per-key state to downstream key-preserving DoFns. However, this
is also runner-specific, because it's conceivable that a runner might not
need this "key-partitioned" property (in fact it's best if a runner
inserted such a "redistribute by key" automatically if it needs it...), and
it currently isn't exposed anyway.

Still thinking about the best way to describe this in a way that's least
confusing to users.

Regarding redistributing into N shards: this is problematic because it
doesn't seem to make sense in the unified model (in streaming in particular
- having N keys doesn't mean you have N bundles), and breaks down if you
add dynamic work rebalancing, backups and other magic. So I decided not to
bother with this in that PR.

Agreed with Robert that limiting the parallelism, or throttling, are very
useful features, but Redistribute is not the right place to introduce them.

On Mon, Oct 10, 2016 at 12:58 PM Amit Sela <amitsel...@gmail.com> wrote:

> On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw
> <rober...@google.com.invalid>
> wrote:
>
> > On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela <amitsel...@gmail.com> wrote:
> >
> > > Hi Eugene,
> >
> > >
> >
> > > This is very interesting.
> >
> > > Let me see if I get this right, the "Redistribute"  transformation
> > assigns
> >
> > > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
> > extracts
> >
> > > back the values, correct ?
> >
> >
> >
> > The keys are (pseudorandomly) unique per element.
> >
> >
> >
> > > As for "Redistribute.byKey" - it's made of a GroupByKey transformation
> > that
> >
> > > follows a Window transformation that neutralises the "resolution" of
> >
> > > triggers and panes that usually occurs in GroupByKey, correct ?
> >
> > >
> >
> > > So this is basically a "FanOut" transformation which will depend on the
> >
> > > available resources of the runner (and the uniqueness of the assigned
> > keys)
> >
> > > ?
> >
> > >
> >
> > > Would we want to Redistribute into a user-defined number of bundles (>
> >
> > > current) ?
> >
> >
> >
> > I don't think there's any advantage to letting the user specify a
> >
> > number here; the data is spread out among as many machines as are
> >
> > handling the shuffling (for N elements, there are ~N unique keys,
> >
> > which gets partitioned by the system to the M workers).
> >
> >
> >
> > > How about "FanIn" ?
> >
> >
> >
> > Could you clarify what you would hope to use this for?
> >
> Well, what if for some reason I would want to limit parallelism for a step
> in the Pipeline ? like calling an external service without "DDoS"ing it ?
>
> >
> >
> >
> > > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
> >
> > > <kirpic...@google.com.invalid> wrote:
> >
> > >
> >
> > >> Hello,
> >
> > >>
> >
> > >> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
> >
> > >> introduce a transform called "Redistribute", encapsulating a
> relatively
> >
> > >> common pattern - a "fusion break" [see
> >
> > >>
> >
> > >>
> >
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
> >
> > >> previously
> >
> > >> providing advice on that] - useful e.g. when you write an IO as a
> > sequence
> >
> > >> of ParDo's: split a query into parts, read each part, and you want to
> >
> > >> prevent fusing these ParDo's because that would make the whole thing
> >
>

Re: [REMINDER] Technical discussion on the mailing list

2016-10-06 Thread Eugene Kirpichov
Hi Daniel,

Thanks for raising this. I think I was a major contributor to your
frustration with the process by suggesting big changes to your IO PR.

As others have said, ideally that process should have gone differently: 1)
we should have had documentation on the best practices in developing IOs,
and 2) I should not have requested you to do the AutoValue conversion in
your PR because that (using AutoValue for PTransform builders) was a new
idea at the time and it was unreasonable to put the burden of implementing
it on you.

I'm very much in favor of improving that process, e.g. by looking at
recently submitted IOs and putting the essence of discussion on those PRs
together into a best-practices document (this document, when it appears,
should *absolutely* be discussed on the mailing list!); and I should have
at least mentioned the idea "consider using it for PTransform builders too"
on the mailing list, because even though we already use AutoValue in many
places, we didn't use it for PTransforms, so that is a new technical idea,
even if not a new concept. This was an example of a technical discussion
that happened off list, and thanks to JB for reflecting it back onto the
list. Indeed, it takes some habit-building time to learn to do this
consistently.

***

Now, for SplittableDoFn in particular - I just looked through all the 171
comments on the PR [https://github.com/apache/incubator-beam/pull/896] and
I honestly don't think there's anything there that merited a broad
discussion on the list - the API is nearly exactly the same as was proposed
on the mailing list, and the comments are about style nitpicks or
implementation details relevant only to people actively working on the
involved classes. Weekly updates, if I provided them, would have been of
the form "Still addressing comments / refactoring $x / extracting part $y
into a separate PR / adding tests". The large number of comments is because
this is a large PR and there's a lot of gritty details there to get right,
but none of them are high-level - the high-level design has been finalized
already.

The PR is taking so long also because I've spun off several smaller PRs
making focused changes to individual subsystems (e.g. DoFnReflector), which
I think also do not merit being discussed on the mailing list, because they
don't change the technical direction of the project, these are deep
implementation-detail classes, there's only 2-3 people involved in
developing or using them (including myself), and all these people are
listed as reviewers.

Please let me know if you feel differently - as the person driving SDF, I
really would like to be sure that the community is satisfied with the
communication around it.

On Thu, Oct 6, 2016 at 3:05 AM Daniel Kulp  wrote:

> At the end of the day, it comes down to two questions:
>
> 1) Are there technical and project direction discussions happening off
> list and not reflected back to the list?
>
> 2) If yes, are the concrete decisions being made as a result of the off
> list discussions?
>
>
> The answer to #1 is a definite yes.   From a quick perusal, several of the
> pull requests that have more than 10-15 comments is a discussion that
> should be back here.  Look at the Metrics PR, the splittable DoFn pr, etc…
> There are discussions there that NEED to be coming back to this list one
> way or another.   For things like the Splittable DoFn that is a long
> running discussion, it would most likely need periodic summaries/updates.
> Trying to follow everything going on in that PR is impossible with the
> comments on the outdated commits and such.
>
> Because of the scale of the #1 problem, I’m unsure of the answer to #2,
> but my gut feeling is yes.  If the off list discussions are resulting in
> technical changes that people don’t know about and cannot object to or
> comment on, then there is a problem.
>
> From an Apache standpoint, we have to get the answer to BOTH questions to
> a “no” state.   That’s a requirement.   The question is how do we get there?
>
> Dan
>
>
>
> > On Oct 6, 2016, at 8:29 AM, Davor Bonaci 
> wrote:
> >
> > Daniel, so glad you are starting to contribute to Beam! It was great
> > talking with you in person back in May. Welcome!
> >
> > --
> >
> > There are lots of different things mentioned here; I'll try to address
> them
> > separately.
> >
> > The first use of AutoValue should have been discussed on the dev@
> mailing
> > list. I think the main reason for the discussion is a bit different --
> > AutoValue has a non-trivial tradeoff -- compile complexity vs.
> boilerplate
> > code. For example, AutoValue may degrade IDE experience for some
> > contributors. If we'd go in depth on this, I'm sure we'd find opposing
> > opinions on the use of AutoValue. This tradeoff should have been
> discussed
> > on the dev@ list, followed by a community decision.
> >
> > Note that this has happened *before* the JdbcIO work. Since AutoValue has
> > been 

Re: Beam IO: suggestions and new features

2016-09-20 Thread Eugene Kirpichov
On Mon, Sep 19, 2016 at 7:17 PM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Eugene,
>
> thanks for your feedbacks.
>
> See my comments:
>
> 1/ I now have a better understanding the logic/steps to write IO and as
> a IO Developer, it's not really a problem. My concern is more when you
> have to build an application on top of Beam (like for instance a DSL).
> Then, we can identify a Read PTransform (as it starts with PBegin) and a
> Write PTransform (as it ends with PDone). But, it's not so easy to
> identify and create wrapper based on that.
> I'm working on couple of Beam DSL right now, and it's the issue I have.
>
Hmm, I don't quite understand this paragraph. Why do we need to identify
transforms as "read"/"write", and what wrappers are you talking about?

Also please keep in mind the possibility of transforms that can reasonably
called "read" but don't start with a PBegin - e.g. if you develop a
transform that reads a PCollection of filenames. Or likewise, "write" but
don't end with a PDone: say, a transform that writes data to a temporary
directory and returns the name of the directory as a single-element
PCollection.


>
> 2/ You are right, the proposal is more generic than just IO and having
> kind of "canonical" standard Beam format will simplify a lot the
> pipelines (including the transform steps).
> We could image having a Avro IndexRow/IndexedRecord (schema) as standard
> format in the PCollection. We can represent basically anything with it.
> The IO will have to create PCollection element of this type.
>
I think I would prefer if the Beam programming model stayed lightweight and
continued to do just one job - processing the data - while allowing data to
be represented in whichever way is reasonable depending on the context.
Trying to make Beam *also* be a universal data representation format
appropriate for all cases seems unnecessary.

E.g. BigtableIO.Read should return the bigtable type Row, BigtableIO.Write
should take Mutation objects, and Mongodb.Read/Write should use
org.bson.Document (btw, right now they use String - I know I approved that
in review, but perhaps this should be changed).

However, I acknowledge that there are patterns in data processing that
could benefit from having a structured representation of data (e.g. better
optimization opportunities for the runner), so perhaps Beam could *also*
provide such a representation - e.g. see how Spark does data frames and
maps them onto Java objects in a lightweight way
http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection
. But this should be probably provided as a collection of utilities, rather
than as a core feature of the programming model.


>
> Regards
> JB
>
> On 09/20/2016 03:59 AM, Eugene Kirpichov wrote:
> > Hi!
> >
> > Thank you for raising these issues. Comments inline.
> >
> > On Fri, Sep 16, 2016 at 7:19 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> >> Hi all,
> >>
> >> as you may know I'm working on different new IOs for Beam.
> >>
> >> I have some suggestions that I would like to discuss with you all.
> >>
> >> 1/ Sink
> >> The SDK provides a Sink abstract class. It represents a resource that
> >> can be written using a Write transform:
> >>
> >>p.apply(Write.to(new MySink()));
> >>
> >> The Sink creates a Writer. A Writer is an abstract class where we
> >> override the open(), write(), close() methods.
> >> Today, no IOs use Sink: they directly use a DoFn.
> >> I fully agree that it's very convenient to implement a Sink but it may
> >> appear like non consistent and can "perturb" the users/developers.
> >>
> >> It comes me to the second point.
> >>
> >> 2/ Source
> >> Today, a IO Read apply() method use a source via Read.from(source).
> >>
> >> However, if the source API is not required (for instance in the case of
> >> JDBC where we can't really implement getEstimatedSizeBytes() and
> >> splitIntoBundles()), it's possible to directly use a DoFn instead of a
> >> Source.
> >>
> >> So again, it could appear like non consistent.
> >>
> >> Maybe it would make more sense to "force" the usage of Source even if we
> >> don't leverage all Source features (for instance, in the case of JDBC
> >> IO, getEstimatedSizeBytes() will return 0L and splitIntoBundles() will
> >> return a list with a single source).
> >> The same for Sink: even if a Sink can be implemented with DoFn, it would
> >> be more consistent to implement it with 

Re: Should UnboundedSource provide a split identifier ?

2016-09-06 Thread Eugene Kirpichov
Oh! Okay, looks like this is a part of the code I was unfamiliar with. I'd
like to know the answer too, in this case.
+Daniel Mills <mil...@google.com> can you comment ?

On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <amitsel...@gmail.com> wrote:

> That is correct, as long as non of the Kafka topics "grow" another
> partition (which it could).
> In that case, some bundle will have to start reading from this partition as
> well, and since all other bundles already have a "previous checkpoint" it
> matters which checkpoint to relate to. I don't know how it's implemented in
> Dataflow, but in Spark I'm testing using mapWithState to store the
> checkpoints per split.
> It also seems that order does matter to the KafkIO:
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L636
>
> On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
>
> > Hi Amit,
> > Could you explain more about why you're saying the order of splits
> matters?
> > AFAIK the semantics of Read.Unbounded is "read from all of the splits in
> > parallel, checkpointing each of them independently", so their order
> > shouldn't matter.
> >
> > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <amitsel...@gmail.com> wrote:
> >
> > > UnboundedSources generate initial splits, which are basically splits of
> > > them selves - for example, if an UnboundedKafkaSource is set to read
> from
> > > topic T1 which is made of 2 partitions P1 and P2, it will (optimally)
> > split
> > > into two UnboundedKafkaSource, one per partition.
> > > During the lifecycle of the "reader" bundles, CheckpointMarks are used
> to
> > > checkpoint "last-read" and so readers may restart/resume. I'm assuming
> > this
> > > is how newly created partitions will automatically be added to readers.
> > >
> > > The problem is that it's all fine while there is only one topic (Kafka
> > > topic-partition pairs are ordered), but if reading from more then one
> > topic
> > > this may break:
> > > T1,P1
> > > T1,P2
> > > T1,P3
> > > T2,P1
> > > The order is not maintained and T2,P1 is 4th now.
> > >
> > > If splits (UnboundedSources) had an identifier, this could be avoided,
> > and
> > > checkpoints could be persisted accordingly.
> > > For example, an UnboundedKafkaSource could use the hash code of it's
> > > assigned topic-partition pairs.
> > >
> > > I don't know how relevant this is to other Sources, but I guess it is
> as
> > > long as they may grow their partitions dynamically (though I might be
> > > completely wrong...) and I don't see much of a downside.
> > >
> > > Thoughts ?
> > >
> > > This is something that troubles me now while working on Read.Unbounded,
> > and
> > > from a quick look I saw that the FlinkRunner expects "stable" splitting
> > as
> > > well.. How does Dataflow handle this ?
> > >
> > > Thanks,
> > > Amit
> > >
> >
>


Re: Should UnboundedSource provide a split identifier ?

2016-09-06 Thread Eugene Kirpichov
Hi Amit,
Could you explain more about why you're saying the order of splits matters?
AFAIK the semantics of Read.Unbounded is "read from all of the splits in
parallel, checkpointing each of them independently", so their order
shouldn't matter.

On Tue, Sep 6, 2016 at 3:17 PM Amit Sela  wrote:

> UnboundedSources generate initial splits, which are basically splits of
> them selves - for example, if an UnboundedKafkaSource is set to read from
> topic T1 which is made of 2 partitions P1 and P2, it will (optimally) split
> into two UnboundedKafkaSource, one per partition.
> During the lifecycle of the "reader" bundles, CheckpointMarks are used to
> checkpoint "last-read" and so readers may restart/resume. I'm assuming this
> is how newly created partitions will automatically be added to readers.
>
> The problem is that it's all fine while there is only one topic (Kafka
> topic-partition pairs are ordered), but if reading from more then one topic
> this may break:
> T1,P1
> T1,P2
> T1,P3
> T2,P1
> The order is not maintained and T2,P1 is 4th now.
>
> If splits (UnboundedSources) had an identifier, this could be avoided, and
> checkpoints could be persisted accordingly.
> For example, an UnboundedKafkaSource could use the hash code of it's
> assigned topic-partition pairs.
>
> I don't know how relevant this is to other Sources, but I guess it is as
> long as they may grow their partitions dynamically (though I might be
> completely wrong...) and I don't see much of a downside.
>
> Thoughts ?
>
> This is something that troubles me now while working on Read.Unbounded, and
> from a quick look I saw that the FlinkRunner expects "stable" splitting as
> well.. How does Dataflow handle this ?
>
> Thanks,
> Amit
>


Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-29 Thread Eugene Kirpichov
Hi Aljoscha,

The watermark reporting is done via
ProcessContinuation.futureOutputWatermark, at the granularity of returning
from individual processElement() calls - you return from the call and give
a watermark on your future output. We assume that updating watermark is
sufficient at a per-bundle level (or, if not, then that you can make
bundles small enough) cause that's the same level at which state changes,
timers etc. are committed.
It can be implemented by setting a per-key watermark hold and updating it
when each call for this element returns. That's the way it is implemented
in my current prototype https://github.com/apache/incubator-beam/pull/896 (see
SplittableParDo.ProcessFn)

On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> I have another question about this: currently, unbounded sources have
> special logic for determining the watermark and the system periodically
> asks the sources for the current watermark. As I understood it, watermarks
> are only "generated" at the sources. How will this work when sources are
> implemented as a combination of DoFns and SplittableDoFns? Will
> SplittableDoFns be asked for a watermark, does this mean that watermarks
> can then be "generated" at any operation?
>
> Cheers,
> Aljoscha
>
> On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov <kirpic...@google.com.invalid
> >
> wrote:
>
> > Hi JB,
> >
> > Yes, I'm assuming you're referring to the "magic" part on the transform
> > expansion diagram. This is indeed runner-specific, and timers+state are
> > likely the simplest way to do this for an SDF that does unbounded amount
> of
> > work.
> >
> > On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> > > Anyway, from a runner perspective, we will have kind of API (part of
> the
> > > Runner API) to "orchestrate" the SDF as we discussed during the call,
> > > right ?
> > >
> > > Regards
> > > JB
> > >
> > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
> > > > Hi Aljoscha,
> > > > This is an excellent question! And the answer is, we don't need any
> new
> > > > concepts like "SDF executor" and can rely on the per-key state and
> > timers
> > > > machinery that already exists in all runners because it's necessary
> to
> > > > implement windowing/triggering properly.
> > > >
> > > > Note that this is already somewhat addressed in the previously posted
> > > State
> > > > and Timers proposal https://s.apache.org/beam-state , under "per-key
> > > > workflows".
> > > >
> > > > Think of it this way, using the Kafka example: we'll expand it into a
> > > > transform:
> > > >
> > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
> > > > partition in topic.listPartitions() }
> > > > (2) GroupByKey
> > > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the
> > > > proposal/slides }
> > > >   - R is the OffsetRange restriction which in this case will be
> always
> > of
> > > > the form [startOffset, inf).
> > > >   - there'll be just 1 value per key, but we use GBK to just get
> access
> > > to
> > > > the per-key state/timers machinery. This may be runner-specific;
> maybe
> > > some
> > > > runners don't need a GBK to do that.
> > > >
> > > > Now suppose the topic has two partitions, P1 and P2, and they get
> > > assigned
> > > > unique keys K1, K2.
> > > > Then the input to (3) will be a collection of: (K1, topic, P1, [0,
> > inf)),
> > > > (K2, topic, P2, [0, inf)).
> > > > Suppose we have just 1 worker with just 1 thread. Now, how will this
> > > thread
> > > > be able to produce elements from both P1 and P2? here's how.
> > > >
> > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
> > > > certain time or after a certain number of elements are output (just
> > like
> > > > with the current UnboundedSource reading code) producing a residual
> > > > restriction R1' (basically a new start timestamp), put R11 into the
> > > per-key
> > > > state and set a timer T1 to resume.
> > > > Then it will process (K2, topic, P2, [0, inf)), do the same
> producing a
> > > > residual restriction R2' and setting a timer T2 to resume.
> > > 

Re: Remove legacy import-order?

2016-08-23 Thread Eugene Kirpichov
Two cents: While we're at it, we could consider enforcing formatting as
well (https://github.com/google/google-java-format). That's a bigger change
though, and I don't think it has checkstyle integration or anything like
that.

On Tue, Aug 23, 2016 at 4:54 PM Dan Halperin 
wrote:

> yeah I think that we would be SO MUCH better off if we worked with an
> out-of-the-box IDE. We don't even distribute an IntelliJ/Eclipse config
> file right now, and I'd like to not have to.
>
> But, ugh, it will mess up ongoing PRs. I guess committers could fix them in
> merge, or we could just make proposers rebase. (Since committers are most
> proposers, probably little harm in the latter).
>
> On Tue, Aug 23, 2016 at 4:11 PM, Jesse Anderson 
> wrote:
>
> > Please. That's the one that always trips me up.
> >
> > On Tue, Aug 23, 2016, 4:10 PM Ben Chambers  wrote:
> >
> > > When Beam was contributed it inherited an import order [1] that was
> > pretty
> > > arbitrary. We've added org.apache.beam [2], but continue to use this
> > > ordering.
> > >
> > > Both Eclipse and IntelliJ default to grouping imports into alphabetic
> > > order. I think it would simplify development if we switched our
> > checkstyle
> > > ordering to agree with these IDEs. This also removes special treatment
> > for
> > > specific packages.
> > >
> > > If people agree, I'll send out a PR that changes the checkstyle
> > > configuration and runs IntelliJ's sort-imports on the existing files.
> > >
> > > -- Ben
> > >
> > > [1]
> > > org.apache.beam,com.google,android,com,io,Jama,junit,net,
> > org,sun,java,javax
> > > [2] com.google,android,com,io,Jama,junit,net,org,sun,java,javax
> > >
> >
>


Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-21 Thread Eugene Kirpichov
Hi JB,

Yes, I'm assuming you're referring to the "magic" part on the transform
expansion diagram. This is indeed runner-specific, and timers+state are
likely the simplest way to do this for an SDF that does unbounded amount of
work.

On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Anyway, from a runner perspective, we will have kind of API (part of the
> Runner API) to "orchestrate" the SDF as we discussed during the call,
> right ?
>
> Regards
> JB
>
> On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
> > Hi Aljoscha,
> > This is an excellent question! And the answer is, we don't need any new
> > concepts like "SDF executor" and can rely on the per-key state and timers
> > machinery that already exists in all runners because it's necessary to
> > implement windowing/triggering properly.
> >
> > Note that this is already somewhat addressed in the previously posted
> State
> > and Timers proposal https://s.apache.org/beam-state , under "per-key
> > workflows".
> >
> > Think of it this way, using the Kafka example: we'll expand it into a
> > transform:
> >
> > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
> > partition in topic.listPartitions() }
> > (2) GroupByKey
> > (3) ParDo { key, topic, partition, R -> Kafka reader code in the
> > proposal/slides }
> >   - R is the OffsetRange restriction which in this case will be always of
> > the form [startOffset, inf).
> >   - there'll be just 1 value per key, but we use GBK to just get access
> to
> > the per-key state/timers machinery. This may be runner-specific; maybe
> some
> > runners don't need a GBK to do that.
> >
> > Now suppose the topic has two partitions, P1 and P2, and they get
> assigned
> > unique keys K1, K2.
> > Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)),
> > (K2, topic, P2, [0, inf)).
> > Suppose we have just 1 worker with just 1 thread. Now, how will this
> thread
> > be able to produce elements from both P1 and P2? here's how.
> >
> > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
> > certain time or after a certain number of elements are output (just like
> > with the current UnboundedSource reading code) producing a residual
> > restriction R1' (basically a new start timestamp), put R11 into the
> per-key
> > state and set a timer T1 to resume.
> > Then it will process (K2, topic, P2, [0, inf)), do the same producing a
> > residual restriction R2' and setting a timer T2 to resume.
> > Then timer T1 will fire in the context of the key K1. The thread will
> call
> > processElement again, this time supplying R1' as the restriction; the
> > process repeats and after a while it checkpoints and stores R1'' into
> state
> > of K1.
> > Then timer T2 will fire in the context of K2, run processElement for a
> > while, set a new timer and store R2'' into the state of K2.
> > Etc.
> > If partition 1 goes away, the processElement call will return "do not
> > resume", so a timer will not be set and instead the state associated with
> > K1 will be GC'd.
> >
> > So basically it's almost like cooperative thread scheduling: things run
> for
> > a while, until the runner tells them to checkpoint, then they set a timer
> > to resume themselves, and the runner fires the timers, and the process
> > repeats. And, again, this only requires things that runners can already
> do
> > - state and timers, but no new concept of SDF executor (and consequently
> no
> > necessity to choose/tune how many you need).
> >
> > Makes sense?
> >
> > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> Hi,
> >> I have another question that I think wasn't addressed in the meeting. At
> >> least it wasn't mentioned in the notes.
> >>
> >> In the context of replacing sources by a combination of to SDFs, how do
> you
> >> determine how many "SDF executor" instances you need downstream? For the
> >> sake of argument assume that both SDFs are executed with parallelism 1
> (or
> >> one per worker). Now, if you have a file source that reads from a static
> >> set of files the first SDF would emit the filenames while the second SDF
> >> would receive the filenames and emit their contents. This works well and
> >> the downstream SDF can process one filename after the other. Now, think
> of
> >> something like a Kafka source. The first SD

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-18 Thread Eugene Kirpichov
Hello everybody,

Just a reminder:

The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to join
the call go to
https://hangouts.google.com/hangouts/_/google.com/splittabledofn .
I intend to go over the proposed design and then have a free-form
discussion.

Please have a skim through the proposal doc: https://s.apache.org/
splittable-do-fn
I also made some slides that are basically a trimmed-down version of the
doc to use as a guide when conducting the meeting,
https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
.

I will post notes from the meeting on this thread afterwards.

Thanks, looking forward.

On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin <dhalp...@google.com.invalid>
wrote:

> This is pretty cool! I'll be there too. (unless the hangout gets too full
> -- if so, I'll drop out in favor of others who aren't lucky enough to get
> to talk to Eugene all the time.)
>
> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <psaltis.and...@gmail.com>
> wrote:
>
> > +1 I'll join
> >
> > On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
> apban...@cisco.com
> > >
> > wrote:
> >
> > > + 1, me2
> > >
> > >
> > >
> > >
> > > On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com <javascript:;>>
> > > wrote:
> > >
> > > >+1 as in I'll join ;-)
> > > >
> > > >On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
> > <kirpic...@google.com.invalid
> > > >
> > > >wrote:
> > > >
> > > >> Sounds good, thanks!
> > > >> Then Friday Aug 19th it is, 8am-9am PST,
> > > >> https://staging.talkgadget.google.com/hangouts/_/google.
> > > com/splittabledofn
> > > >>
> > > >> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
> > j...@nanthrax.net
> > > <javascript:;>>
> > > >> wrote:
> > > >>
> > > >> > Hi
> > > >> >
> > > >> > Unfortunately I will be in Ireland on August 15th. What about
> Friday
> > > >> 19th ?
> > > >> >
> > > >> > Regards
> > > >> > JB
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > > >> > <kirpic...@google.com.INVALID> wrote:
> > > >> > >Hi JB,
> > > >> > >
> > > >> > >Sounds great, does the suggested time over videoconference work
> for
> > > >> > >you?
> > > >> > >
> > > >> > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
> > > j...@nanthrax.net <javascript:;>>
> > > >> > >wrote:
> > > >> > >
> > > >> > >> Hi Eugene
> > > >> > >>
> > > >> > >> May we talk together next week ? I like the proposal. I would
> > just
> > > >> > >need
> > > >> > >> some details for my understanding.
> > > >> > >>
> > > >> > >> Thanks
> > > >> > >> Regards
> > > >> > >> JB
> > > >> > >>
> > > >> > >>
> > > >> > >>
> > > >> > >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > > >> > >> <kirpic...@google.com.INVALID> wrote:
> > > >> > >> >Hi JB,
> > > >> > >> >
> > > >> > >> >What are your thoughts on this?
> > > >> > >> >
> > > >> > >> >I'm also thinking of having a virtual meeting to explain more
> > > about
> > > >> > >> >this
> > > >> > >> >proposal if necessary, since I understand it is a lot to
> digest.
> > > >> > >> >
> > > >> > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
> > > >> > >> >(link:
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> >
> > > >> https://staging.talkgadget.google.com/hangouts/_/google.
> > > com/splittabledofn
> > > >> > >> >-
> > > >> > >> >I confirmed that it can be joined 

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-12 Thread Eugene Kirpichov
Correction: the video call join link should be:
https://hangouts.google.com/hangouts/_/google.com/splittabledofn

On Fri, Aug 12, 2016 at 10:14 AM Chawla,Sumit <sumitkcha...@gmail.com>
wrote:

> +1
>
> Regards
> Sumit Chawla
>
>
> On Fri, Aug 12, 2016 at 9:29 AM, Aparup Banerjee (apbanerj) <
> apban...@cisco.com> wrote:
>
> > + 1, me2
> >
> >
> >
> >
> > On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com> wrote:
> >
> > >+1 as in I'll join ;-)
> > >
> > >On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
> <kirpic...@google.com.invalid
> > >
> > >wrote:
> > >
> > >> Sounds good, thanks!
> > >> Then Friday Aug 19th it is, 8am-9am PST,
> > >> https://staging.talkgadget.google.com/hangouts/_/google.
> > com/splittabledofn
> > >>
> > >> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
> j...@nanthrax.net>
> > >> wrote:
> > >>
> > >> > Hi
> > >> >
> > >> > Unfortunately I will be in Ireland on August 15th. What about Friday
> > >> 19th ?
> > >> >
> > >> > Regards
> > >> > JB
> > >> >
> > >> >
> > >> >
> > >> > On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > >> > <kirpic...@google.com.INVALID> wrote:
> > >> > >Hi JB,
> > >> > >
> > >> > >Sounds great, does the suggested time over videoconference work for
> > >> > >you?
> > >> > >
> > >> > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
> > j...@nanthrax.net>
> > >> > >wrote:
> > >> > >
> > >> > >> Hi Eugene
> > >> > >>
> > >> > >> May we talk together next week ? I like the proposal. I would
> just
> > >> > >need
> > >> > >> some details for my understanding.
> > >> > >>
> > >> > >> Thanks
> > >> > >> Regards
> > >> > >> JB
> > >> > >>
> > >> > >>
> > >> > >>
> > >> > >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > >> > >> <kirpic...@google.com.INVALID> wrote:
> > >> > >> >Hi JB,
> > >> > >> >
> > >> > >> >What are your thoughts on this?
> > >> > >> >
> > >> > >> >I'm also thinking of having a virtual meeting to explain more
> > about
> > >> > >> >this
> > >> > >> >proposal if necessary, since I understand it is a lot to digest.
> > >> > >> >
> > >> > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
> > >> > >> >(link:
> > >> > >> >
> > >> > >>
> > >> > >
> > >> >
> > >> https://staging.talkgadget.google.com/hangouts/_/google.
> > com/splittabledofn
> > >> > >> >-
> > >> > >> >I confirmed that it can be joined without being logged into a
> > Google
> > >> > >> >account)
> > >> > >> >
> > >> > >> >Who'd be interested in attending, and does this time/date work
> for
> > >> > >> >people?
> > >> > >> >
> > >> > >> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> > >> > ><kirpic...@google.com>
> > >> > >> >wrote:
> > >> > >> >
> > >> > >> >> Hi JB, thanks for reading and for your comments!
> > >> > >> >>
> > >> > >> >> It sounds like you are concerned about continued support for
> > >> > >existing
> > >> > >> >IO's
> > >> > >> >> people have developed, and about backward compatibility?
> > >> > >> >>
> > >> > >> >> We do not need to remove the Source API, and all existing
> > >> > >> >Source-based
> > >> > >> >> connectors will continue to work [though the document proposes
> > at
> > >> > >> >some
> > >> > >> >> point to make 

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-11 Thread Eugene Kirpichov
Hi JB,

Sounds great, does the suggested time over videoconference work for you?

On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Eugene
>
> May we talk together next week ? I like the proposal. I would just need
> some details for my understanding.
>
> Thanks
> Regards
> JB
>
>
>
> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> <kirpic...@google.com.INVALID> wrote:
> >Hi JB,
> >
> >What are your thoughts on this?
> >
> >I'm also thinking of having a virtual meeting to explain more about
> >this
> >proposal if necessary, since I understand it is a lot to digest.
> >
> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
> >(link:
> >
> https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn
> >-
> >I confirmed that it can be joined without being logged into a Google
> >account)
> >
> >Who'd be interested in attending, and does this time/date work for
> >people?
> >
> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov <kirpic...@google.com>
> >wrote:
> >
> >> Hi JB, thanks for reading and for your comments!
> >>
> >> It sounds like you are concerned about continued support for existing
> >IO's
> >> people have developed, and about backward compatibility?
> >>
> >> We do not need to remove the Source API, and all existing
> >Source-based
> >> connectors will continue to work [though the document proposes at
> >some
> >> point to make Read.from(Source) to translate to a wrapper SDF under
> >the
> >> hood, to exercise the feature more and to make sure that it is
> >strictly
> >> more powerful - but this is an optional implementation detail].
> >>
> >> Perhaps the document phrases this too strongly - "replacing the
> >Source
> >> API": a better phrasing would be "introducing a new API so powerful
> >and
> >> easy-to-use that hopefully people will choose it over the Source API
> >all
> >> the time, even though they don't have to" :) And we can discuss
> >whether or
> >> not to actually deprecate/remove the Source API at some point down
> >the
> >> road, once it becomes clear whether this is the case or not.
> >>
> >> To give more context: this proposal came out of discussions within
> >the SDK
> >> team over the past ~1.5 years, before the Beam project existed, on
> >how to
> >> make major improvements to the Source API; perhaps it will clarify
> >things
> >> if I give a history of the ideas discussed:
> >> - The first idea was to introduce a Read.from(PCollection)
> >> transform while keeping the Source API intact - this, given
> >appropriate
> >> implementation, would solve most of the scalability and composability
> >> issues of IO's. Then most connectors would look like : ParDo<A,
> >Source>
> >> + Read.from().
> >> - Then we figured that the Source class is an unnecessary
> >abstraction, as
> >> it simply holds data. What if we only had a Reader<S, B> class where
> >S is
> >> the source type and B the output type? Then connectors would be
> >something
> >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
> >> - Then somebody remarked that some of the features of Source are
> >useful to
> >> ParDo's as well: e.g. ability to report progress when processing a
> >very
> >> heavy element, or ability to produce very large output in parallel.
> >> - The two previous bullets were already hinting that the Read.using()
> >> primitive might not be so special: it just takes S and produces B:
> >isn't
> >> that what a ParDo does, plus some source magic, minus the convenience
> >of
> >> c.output() vs. the start/advance() state machine?
> >> - At this point it became clear that we should explore unifying
> >sources
> >> and ParDo's, in particular: can we bring the magic of sources to
> >ParDo's
> >> but without the limitations and coding inconveniences? And this is
> >how
> >> SplittableDoFn was born: bringing source magic to a DoFn by providing
> >a
> >> RangeTracker.
> >> - Once the idea of "splittable DoFn's" was born, it became clear that
> >it
> >> is strictly more general than sources; at least, in the respect that
> >> sources have to produce output, while DoFn's don't: an SDF may very
> >well
> >> produce no out

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-11 Thread Eugene Kirpichov
Hi JB,

What are your thoughts on this?

I'm also thinking of having a virtual meeting to explain more about this
proposal if necessary, since I understand it is a lot to digest.

How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
(link:
https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn -
I confirmed that it can be joined without being logged into a Google
account)

Who'd be interested in attending, and does this time/date work for people?

On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi JB, thanks for reading and for your comments!
>
> It sounds like you are concerned about continued support for existing IO's
> people have developed, and about backward compatibility?
>
> We do not need to remove the Source API, and all existing Source-based
> connectors will continue to work [though the document proposes at some
> point to make Read.from(Source) to translate to a wrapper SDF under the
> hood, to exercise the feature more and to make sure that it is strictly
> more powerful - but this is an optional implementation detail].
>
> Perhaps the document phrases this too strongly - "replacing the Source
> API": a better phrasing would be "introducing a new API so powerful and
> easy-to-use that hopefully people will choose it over the Source API all
> the time, even though they don't have to" :) And we can discuss whether or
> not to actually deprecate/remove the Source API at some point down the
> road, once it becomes clear whether this is the case or not.
>
> To give more context: this proposal came out of discussions within the SDK
> team over the past ~1.5 years, before the Beam project existed, on how to
> make major improvements to the Source API; perhaps it will clarify things
> if I give a history of the ideas discussed:
> - The first idea was to introduce a Read.from(PCollection)
> transform while keeping the Source API intact - this, given appropriate
> implementation, would solve most of the scalability and composability
> issues of IO's. Then most connectors would look like : ParDo<A, Source>
> + Read.from().
> - Then we figured that the Source class is an unnecessary abstraction, as
> it simply holds data. What if we only had a Reader<S, B> class where S is
> the source type and B the output type? Then connectors would be something
> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
> - Then somebody remarked that some of the features of Source are useful to
> ParDo's as well: e.g. ability to report progress when processing a very
> heavy element, or ability to produce very large output in parallel.
> - The two previous bullets were already hinting that the Read.using()
> primitive might not be so special: it just takes S and produces B: isn't
> that what a ParDo does, plus some source magic, minus the convenience of
> c.output() vs. the start/advance() state machine?
> - At this point it became clear that we should explore unifying sources
> and ParDo's, in particular: can we bring the magic of sources to ParDo's
> but without the limitations and coding inconveniences? And this is how
> SplittableDoFn was born: bringing source magic to a DoFn by providing a
> RangeTracker.
> - Once the idea of "splittable DoFn's" was born, it became clear that it
> is strictly more general than sources; at least, in the respect that
> sources have to produce output, while DoFn's don't: an SDF may very well
> produce no output at all, and simply perform a side effect in a
> parallel/resumable way.
> - Then there were countless hours of discussions on unifying the
> bounded/unbounded cases, on the particulars of RangeTracker APIs
> reconciling parallelization and checkpointing, what the relation between
> SDF and DF should be, etc. They culminated in the current proposal. The
> proposal comes at a time when a couple of key ingredients are (almost)
> ready: NewDoFn to make SDF look like a regular DoFn, and the State/Timers
> proposal to enable unbounded work per element.
>
> To put it shortly:
> - Yes, we will support existing Source connectors, and will support
> writing new ones, possibly forever. There is no interference with current
> users of Source.
> - The new API is an attempt to improve the Source API, taken to its
> logical limit where it turns out that users' goals can be accomplished
> easier and more generically entirely within ParDo's.
>
> Let me know what you think, and thanks again!
>
> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Eugene,
>>
>> Just a question: why is it in DoFn and note an improvement of Source ?
>>
>> If I understand correctly, it means that we will have to 

[PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-04 Thread Eugene Kirpichov
Hello Beam community,

We (myself, Daniel Mills and Robert Bradshaw) would like to propose
"Splittable DoFn" - a major generalization of DoFn, which allows processing
of a single element to be non-monolithic, i.e. checkpointable and
parallelizable, as well as doing an unbounded amount of work per element.

This allows effectively replacing the current Bounded/UnboundedSource APIs
with DoFn's that are much easier to code, more scalable and composable with
the rest of the Beam programming model, and enables many use cases that
were previously difficult or impossible, as well as some non-obvious new
use cases.

This proposal has been mentioned before in JIRA [BEAM-65] and some Beam
meetings, and now the whole thing is written up in a document:

https://s.apache.org/splittable-do-fn

Here are some things that become possible with Splittable DoFn:
- Efficiently read a filepattern matching millions of files
- Read a collection of files that are produced by an earlier step in the
pipeline (e.g. easily implement a connector to a storage system that can
export itself to files)
- Implement a Kafka reader by composing a "list partitions" DoFn with a
DoFn that simply polls a consumer and outputs new records in a while() loop
- Implement a log tailer by composing a DoFn that incrementally returns new
files in a directory and a DoFn that tails a file
- Implement a parallel "count friends in common" algorithm (matrix
squaring) with good work balancing

Here is the meaningful part of a hypothetical Kafka reader written against
this API:

ProcessContinuation processElement(
ProcessContext context, OffsetRangeTracker tracker) {
  try (KafkaConsumer consumer =
Kafka.subscribe(context.element().topic,
context.element().partition)) {
consumer.seek(tracker.start());
while (true) {
  ConsumerRecords records = consumer.poll(100ms);
  if (records == null) return done();
  for (ConsumerRecord record : records) {
if (!tracker.tryClaim(record.offset())) {
  return resume().withFutureOutputWatermark(record.timestamp());
}
context.output(record);
  }
}
  }
}

The document describes in detail the motivations behind this feature, the
basic idea and API, open questions, and outlines an incremental delivery
plan.

The proposed API builds on the reflection-based new DoFn [new-do-fn] and is
loosely related to "State and Timers for DoFn" [beam-state].

Please take a look and comment!

Thanks.

[BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
[new-do-fn] https://s.apache.org/a-new-do-fn
[beam-state] https://s.apache.org/beam-state


Re: Suggestion for Writing Sink Implementation

2016-07-27 Thread Eugene Kirpichov
Hi Sumit,

All reusable parts of a pipeline, including connectors to storage systems,
should be packaged as PTransform's.

Sink is an advanced API that you can use under the hood to implement the
transform, if this particular connector benefits from this API - but you
don't have to, and many connectors indeed don't need it, and are simpler to
implement just as wrappers around a couple of ParDo's writing the data.

Even if the connector is implemented using a Sink, packaging the connector
as a PTransform is important because it's easier to apply in a pipeline and
because it's more future-proof (the author of the connector may later
change it to use something else rather than Sink under the hood without
breaking existing users).

Sink is, currently, useful in the following case:
- You're writing a bounded amount of data (we do not yet have an unbounded
Sink analogue)
- The location you're writing to is known at pipeline construction time,
and does not depend on the data itself (support for "data-dependent" sinks
is on the radar https://issues.apache.org/jira/browse/BEAM-92)
- The storage system you're writing to has a distinct "initialization" and
"finalization" step, allowing the write operation to appear atomic (either
all data is written or none). This mostly applies to files (where writing
is done by first writing to a temporary directory, and then renaming all
files to their final location), but there can be other cases too.

Here's an example GCP connector using the Sink API under the hood:
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
Most other non-file-based connectors, indeed, don't (KafkaIO, DatastoreIO,
BigtableIO etc.)

I'm not familiar with the Flink API, however I'm a bit confused by your
last paragraph: the Beam programming model is intentionally
runner-agnostic, so that you can run exactly the same code on different
runners.

On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit  wrote:

> Hi
>
> Please suggest me on what is the best way to write a Sink in Beam.  I see
> that there is a Sink abstract class which is in experimental state.
> What is the expected outcome of this one? Do we have the api frozen, or
> this could still change?  Most of the existing Sink implementations like
> KafkaIO.Write are not using this interface, and instead extends
> PTransform>, PDone>. Would these be changed to extend
> Sink<>.
>
>
> My immediate requirement is to run this Sink on FlinkRunner. Which mandates
> that my implementation must also implement SinkFunction<>.  In that case,
> none of the Sink<> methods get called anyway.
>
> Regards
> Sumit Chawla
>


Re: [PROPOSAL] MultiLineIO

2016-03-14 Thread Eugene Kirpichov
Thanks Peter! Please also make sure to use SourceTestUtils to verify that
your FileBasedSource is well-behaved w.r.t. dynamic work rebalancing
(especially the various assertSplitAtFraction methods). For examples, see
XmlSourceTest
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java>
.

On Mon, Mar 14, 2016 at 12:10 PM Giesin, Peter <peter.gie...@fisglobal.com>
wrote:

> The MultiLineIO is a BoundedSource and an extension of FileBasedSource.
> Where the FileBasedSource reads a single line at a time the MultiLineIO
> allows the user to define an arbitrary “message” delimiter. It then reads
> through the file, removing newlines, until the separator is read, finally
> returning the character sequence that is built.
>
>
>
> I believe it is already built using the new style but I will compare it to
> the BigTableIO to confirm that.
>
> Peter
>
> On 3/14/16, 1:50 PM, "Jean-Baptiste Onofré" <j...@nanthrax.net> wrote:
>
> >I second Eugene here.
> >
> >In the past, I developed some IOs using the "old style" (as did in the
> >PubSubIO). I'm now refactoring it to use the "new style".
> >
> >Regards
> >JB
> >
> >On 03/14/2016 06:47 PM, Eugene Kirpichov wrote:
> >> Hi Peter,
> >> Looking forward to your PR. Please note that source classes are
> relatively
> >> tricky to develop, so would you mind briefly explaining what your source
> >> will do here over email, so that we hash out some possible issues early
> >> rather than in PR comments?
> >> Also note that now recommend to package IO connectors as PTransforms,
> >> making the PTransform class itself be a builder - while the Source/Sink
> >> classes should be kept package-private (rather than exposed to the
> user).
> >> For an example of a connector packaged in this style, see BigtableIO (
> >>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_GoogleCloudPlatform_DataflowJavaSDK_blob_master_sdk_src_main_java_com_google_cloud_dataflow_sdk_io_bigtable_BigtableIO.java=BQIDaQ=3BfiSO86x5iKjpl2b39jud9R1NrKYqPq2js90dwBswk=Qm-l_hW9ETnsf6X4GnnKezFfnAEwc328ni8ljHdGYjo=spZLCFrFYTtUSPsGFMTVvmXPyfW-dr7Uouq-4BtWaPQ=qJJMaoRlOHxy1MRcAwa7aIJxwGYJyUKL93FdO4jZr1I=
> >> ).
> >> The advantage is that this style allows you to restructure the
> connector or
> >> add additional transforms into its implementation if necessary, without
> >> changing the call sites. It might seem less important in case of a
> simple
> >> connector like reading lines from file, but it will become much more
> >> important with things like SplittableDoFn
> >> <
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_BEAM-2D65=BQIDaQ=3BfiSO86x5iKjpl2b39jud9R1NrKYqPq2js90dwBswk=Qm-l_hW9ETnsf6X4GnnKezFfnAEwc328ni8ljHdGYjo=spZLCFrFYTtUSPsGFMTVvmXPyfW-dr7Uouq-4BtWaPQ=POJMhWDTbkUnHHLnKcH9FtzeP-lrZkuGZG3YPNNhXSU=
> >.
> >>
> >> On Mon, Mar 14, 2016 at 10:29 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> >> wrote:
> >>
> >>> Hi Peter,
> >>>
> >>> awesome !
> >>>
> >>> Yes, you can create the PR using the github mirror.
> >>>
> >>> Does your MultiLineIO use Bounded/Unbounded "new" classes ?
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 03/14/2016 06:23 PM, Giesin, Peter wrote:
> >>>> Hi all!
> >>>>
> >>>> I am looking to get involved in the project. I have a MultiLineIO
> >>> file-based source that I think would be useful. I know the project is
> just
> >>> spinning up but can I simply clone the repo and create a PR for the
> new IO?
> >>> Also looked over JIRA and there are some tickets I can help out with.
> >>>>
> >>>> Best regards,
> >>>> Peter Giesin
> >>>> peter.gie...@fisglobal.com
> >>>>
> >>>>
> >>>> _
> >>>> The information contained in this message is proprietary and/or
> >>> confidential. If you are not the intended recipient, please: (i)
> delete the
> >>> message and all copies; (ii) do not disclose, distribute or use the
> message
> >>> in any manner; and (iii) notify the sender immediately. In addition,
> please
> >>> be aware that any message addressed to our domain is subject to
> archiving
> >>> and review by persons other than the intended recipient. Thank you.

Re: [PROPOSAL] MultiLineIO

2016-03-14 Thread Eugene Kirpichov
Hi Peter,
Looking forward to your PR. Please note that source classes are relatively
tricky to develop, so would you mind briefly explaining what your source
will do here over email, so that we hash out some possible issues early
rather than in PR comments?
Also note that now recommend to package IO connectors as PTransforms,
making the PTransform class itself be a builder - while the Source/Sink
classes should be kept package-private (rather than exposed to the user).
For an example of a connector packaged in this style, see BigtableIO (
https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
).
The advantage is that this style allows you to restructure the connector or
add additional transforms into its implementation if necessary, without
changing the call sites. It might seem less important in case of a simple
connector like reading lines from file, but it will become much more
important with things like SplittableDoFn
.

On Mon, Mar 14, 2016 at 10:29 AM Jean-Baptiste Onofré 
wrote:

> Hi Peter,
>
> awesome !
>
> Yes, you can create the PR using the github mirror.
>
> Does your MultiLineIO use Bounded/Unbounded "new" classes ?
>
> Regards
> JB
>
> On 03/14/2016 06:23 PM, Giesin, Peter wrote:
> > Hi all!
> >
> > I am looking to get involved in the project. I have a MultiLineIO
> file-based source that I think would be useful. I know the project is just
> spinning up but can I simply clone the repo and create a PR for the new IO?
> Also looked over JIRA and there are some tickets I can help out with.
> >
> > Best regards,
> > Peter Giesin
> > peter.gie...@fisglobal.com
> >
> >
> > _
> > The information contained in this message is proprietary and/or
> confidential. If you are not the intended recipient, please: (i) delete the
> message and all copies; (ii) do not disclose, distribute or use the message
> in any manner; and (iii) notify the sender immediately. In addition, please
> be aware that any message addressed to our domain is subject to archiving
> and review by persons other than the intended recipient. Thank you.
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>