Re: [3/4] incubator-beam git commit: Remove KeyedResourcePool

2016-10-13 Thread Jean-Baptiste Onofré

Got your message, so it was a mistake and already fixed.

No worries. Thanks,
Regards
JB

On 10/14/2016 06:35 AM, Jean-Baptiste Onofré wrote:

I saw a push of a eclipse branch on the Beam git repo.

Maybe I missed it, but I didn't see discussion about such branch on the
dev mailing list.

Regards
JB

On 10/13/2016 06:52 PM, Daniel Kulp wrote:


I just submitted a pull request that fixes the code as well as
cherry-picks the yaml change from the last branch.

Dan




On Oct 13, 2016, at 10:48 AM, Jean-Baptiste Onofré 
wrote:

Indeed the .travis.yml has not been merged. I gonna fix that.

Sorry about that.

Regards
JB

On 10/13/2016 04:37 PM, Daniel Kulp wrote:


This is in m2e.That said, it looks like the travis.yml file
wasn’t merged from my “eclipse” branch so Travis wasn’t actually
running agains the eclipse compiler.   That would have caught
this.   JB and I will investigate how that got lost in the merge to
master.

A "mvn -Peclipse-jdt clean install” in direct-java would show the
same error.


Dan




On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré
 wrote:

Hi Dan,

You mean directly building in Eclipse I guess using m2e ?

Regards
JB

On 10/13/2016 03:59 PM, Daniel Kulp wrote:


Just an FYI:   this commit has caused things to not build in
Eclipse, but I’m not exactly sure why.   The errors are in place
where methods of the exact signature just moved into an internal
class so I’m not yet sure why it’s causing an issue.

DescriptionResourcePathLocationType
Bound mismatch: The type Read.Bounded is not a valid
substitute for the bounded parameter > of the type
AppliedPTransform
BoundedReadEvaluatorFactory.java
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct
line 134Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:

Remove KeyedResourcePool

This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0

Tree:
http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff:
http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0

Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh 
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik 
Committed: Thu Oct 6 15:14:38 2016 -0700

--

.../direct/BoundedReadEvaluatorFactory.java |  40 +++--
.../beam/runners/direct/DirectRunner.java   |   2 +
.../beam/runners/direct/EmptyInputProvider.java |  49 ++
.../direct/ExecutorServiceParallelExecutor.java |  27 ++-
.../runners/direct/FlattenEvaluatorFactory.java |  18 +-
.../beam/runners/direct/KeyedResourcePool.java  |  47 --
.../runners/direct/LockedKeyedResourcePool.java |  95 ---
.../beam/runners/direct/RootInputProvider.java  |  41 +
.../runners/direct/RootProviderRegistry.java|  65 
.../direct/RootTransformEvaluatorFactory.java   |  42 -
.../direct/TestStreamEvaluatorFactory.java  |  39 +++--
.../direct/TransformEvaluatorRegistry.java  |  17 +-
.../direct/UnboundedReadEvaluatorFactory.java   |  56 ---
.../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
.../direct/FlattenEvaluatorFactoryTest.java |   3 +-
.../direct/LockedKeyedResourcePoolTest.java | 163
---
.../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
.../UnboundedReadEvaluatorFactoryTest.java  |   8 +-
18 files changed, 269 insertions(+), 449 deletions(-)
--



http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

--

diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

index 4936ad9..326a535 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java

@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
* A {@link TransformEvaluatorFactory} that produces {@link
TransformEvaluator TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
-final class BoundedReadEvaluatorFactory implements
RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements
TransformEvaluatorFactory 

Re: Jenkins build became unstable: beam_PostCommit_MavenVerify #1525

2016-10-13 Thread Dan Halperin
Filed https://issues.apache.org/jira/browse/BEAM-747

On Thu, Oct 13, 2016 at 5:33 PM, Apache Jenkins Server <
jenk...@builds.apache.org> wrote:

> See 
>
>


Re: Specifying type arguments for generic PTransform builders

2016-10-13 Thread Eugene Kirpichov
I think the choice between #1 or #3 is a red herring - the cases where #3
is a better choice than #1 are few and far between, and probably not at all
controversial (e.g. ParDo). So I suggest we drop this part of the
discussion.

Looks like the main contenders for the complex case are #1 (Foo.blah())
vs. #4 (Foo.Unbound and Foo.Bound).

Dan, can you clarify again what you mean by this:
"a utility function that gives you a database reader ready-to-go ... but
independent of the type you want the result to end up as. You can't do
that if you must bind the type first."

I think this is quite doable with #1:

class CompanyDefaults {
public static  DatabaseIO.Read defaultDatabaseIO() { return
DatabaseIO.create().withSettings(blah).withCredentials(blah); }
}

DatabaseIO.Read source =
CompanyDefaults.defaultDatabaseIO().withQuery(blah);

All in all, it seems to me that #1 (combined with potentially encapsulating
parts of the configuration into separate objects, such as
JdbcConnectionParameters in JdbcIO) is the only option that can do
everything fairly well, its only downside is having to specify the type,
and it is very easy to implement when you're implementing your own
transform - which, I agree with Kenn, matters a lot too.

I think that coming up with an easy-to-implement, universally applicable
pattern matters a lot also because the Beam ecosystem is open, and the set
of connectors/transforms available to users will not always be as carefully
curated and reviewed as it is currently - the argument "the implementation
complexity doesn't matter because the user doesn't see it" will not apply.
So, ultimately, "are there a lot of good-quality connectors available to
Beam users" will be equivalent to "is it easy to develop a good-quality
connector". And the consistency between APIs provided by different
connectors will matter for the user experience, too.

On Thu, Oct 13, 2016 at 7:09 PM Kenneth Knowles 
wrote:

> On Thu, Oct 13, 2016 at 4:55 PM Dan Halperin 
> wrote:
> > These
> > suggestions are motivated by making things easier on transform writers,
> but
> > IMO we need to be optimizing for transform users.
>
> To be fair to Eugene, he was actually analyzing real code patterns that
> exists in Beam today, not suggesting new ones.
>
> Along those lines, your addition of the BigTableIO pattern is well-taken
> and my own analysis of that code is #5: "when you don't have a type
> variable to bind, leave every field blank and validate later. Also, have an
> XYZOptions object". I believe in the presence of type parameters this
> reduces to #4 Bound/Unbound classes but it is more palatable in the
> no-type-variable case. It is also a good choice when varying subsets of
> parameters might be optional - the Window transform matches this pattern
> for good reason.
>
> The other major drawback of #3 is the inability to provide generic
> > configuration. For example, a utility function that gives you a database
> > reader ready-to-go with all the default credentials and options you need
> --
> > but independent of the type you want the result to end up as. You can't
> do
> > that if you must bind the type first.
> >
>
> This is a compelling use case. It is valuable for configuration to be a
> first-class object that can be passed around. BigTableOptions is a good
> example. It isn't in contradiction with #3, but actually fits very nicely.
>
> By definition for this default configuration to be first-class it has to be
> more than an invalid intermediate state of a PTransform's builder methods.
> Concretely, it would be BigTableIO.defaultOptions(), which would make
> manifest the inaccessible default options that could be implied by
> BigTableIO.read(). There can sometimes be a pretty fine line between a
> builder and an options object, to be sure. You might distinguish it by
> whether you would conceivably use the object elsewhere - and for
> BigTableOptions the answer is certainly "yes" since it actually is an
> external class. In the extreme, every method takes one giant POJO and that
> sucks.
>
>
> > I think that in general all of these patterns are significantly worse in
> > the long run than the existing standards, e.g., for BigtableIO.
>
>
> Note that BigTableIO.read() is actually not "ready-to-go" but has nulls and
> empty strings that cause crashes if they are not overridden. It is just a
> builder without the concluding "build()" method (for the record: I find
> concluding "build()" methods pointless, too :-)
>
> One of the better examples of the pattern of "ready-to-go" builders -
> though not a transform - is WindowingStrategy (props to Ben), where there
> are intelligent defaults and you can override them, and it tracks whether
> or not each field is a default or a user-specified value. To start it off
> you have to either request "globalDefault()" or "of(WindowFn)", in the
> spirit of #3.
>
> Kenn
>
> On Fri, Oct 7, 2016 at 4:48 PM, Eugene Kirpichov <
> > 

Re: [3/4] incubator-beam git commit: Remove KeyedResourcePool

2016-10-13 Thread Jean-Baptiste Onofré

I saw a push of a eclipse branch on the Beam git repo.

Maybe I missed it, but I didn't see discussion about such branch on the 
dev mailing list.


Regards
JB

On 10/13/2016 06:52 PM, Daniel Kulp wrote:


I just submitted a pull request that fixes the code as well as cherry-picks the 
yaml change from the last branch.

Dan




On Oct 13, 2016, at 10:48 AM, Jean-Baptiste Onofré  wrote:

Indeed the .travis.yml has not been merged. I gonna fix that.

Sorry about that.

Regards
JB

On 10/13/2016 04:37 PM, Daniel Kulp wrote:


This is in m2e.That said, it looks like the travis.yml file wasn’t merged 
from my “eclipse” branch so Travis wasn’t actually running agains the eclipse 
compiler.   That would have caught this.   JB and I will investigate how that 
got lost in the merge to master.

A "mvn -Peclipse-jdt clean install” in direct-java would show the same error.


Dan




On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré  wrote:

Hi Dan,

You mean directly building in Eclipse I guess using m2e ?

Regards
JB

On 10/13/2016 03:59 PM, Daniel Kulp wrote:


Just an FYI:   this commit has caused things to not build in Eclipse, but I’m 
not exactly sure why.   The errors are in place where methods of the exact 
signature just moved into an internal class so I’m not yet sure why it’s 
causing an issue.

Description ResourcePathLocationType
Bound mismatch: The type Read.Bounded is not a valid substitute for the bounded 
parameter > of the type 
AppliedPTransform BoundedReadEvaluatorFactory.java
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 134Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:

Remove KeyedResourcePool

This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0

Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh 
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik 
Committed: Thu Oct 6 15:14:38 2016 -0700

--
.../direct/BoundedReadEvaluatorFactory.java |  40 +++--
.../beam/runners/direct/DirectRunner.java   |   2 +
.../beam/runners/direct/EmptyInputProvider.java |  49 ++
.../direct/ExecutorServiceParallelExecutor.java |  27 ++-
.../runners/direct/FlattenEvaluatorFactory.java |  18 +-
.../beam/runners/direct/KeyedResourcePool.java  |  47 --
.../runners/direct/LockedKeyedResourcePool.java |  95 ---
.../beam/runners/direct/RootInputProvider.java  |  41 +
.../runners/direct/RootProviderRegistry.java|  65 
.../direct/RootTransformEvaluatorFactory.java   |  42 -
.../direct/TestStreamEvaluatorFactory.java  |  39 +++--
.../direct/TransformEvaluatorRegistry.java  |  17 +-
.../direct/UnboundedReadEvaluatorFactory.java   |  56 ---
.../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
.../direct/FlattenEvaluatorFactoryTest.java |   3 +-
.../direct/LockedKeyedResourcePoolTest.java | 163 ---
.../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
.../UnboundedReadEvaluatorFactoryTest.java  |   8 +-
18 files changed, 269 insertions(+), 449 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 4936ad9..326a535 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
-final class BoundedReadEvaluatorFactory implements 
RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
 private final EvaluationContext evaluationContext;

 BoundedReadEvaluatorFactory(EvaluationContext 

Re: Specifying type arguments for generic PTransform builders

2016-10-13 Thread Kenneth Knowles
On Thu, Oct 13, 2016 at 4:55 PM Dan Halperin 
wrote:
> These
> suggestions are motivated by making things easier on transform writers,
but
> IMO we need to be optimizing for transform users.

To be fair to Eugene, he was actually analyzing real code patterns that
exists in Beam today, not suggesting new ones.

Along those lines, your addition of the BigTableIO pattern is well-taken
and my own analysis of that code is #5: "when you don't have a type
variable to bind, leave every field blank and validate later. Also, have an
XYZOptions object". I believe in the presence of type parameters this
reduces to #4 Bound/Unbound classes but it is more palatable in the
no-type-variable case. It is also a good choice when varying subsets of
parameters might be optional - the Window transform matches this pattern
for good reason.

The other major drawback of #3 is the inability to provide generic
> configuration. For example, a utility function that gives you a database
> reader ready-to-go with all the default credentials and options you need --
> but independent of the type you want the result to end up as. You can't do
> that if you must bind the type first.
>

This is a compelling use case. It is valuable for configuration to be a
first-class object that can be passed around. BigTableOptions is a good
example. It isn't in contradiction with #3, but actually fits very nicely.

By definition for this default configuration to be first-class it has to be
more than an invalid intermediate state of a PTransform's builder methods.
Concretely, it would be BigTableIO.defaultOptions(), which would make
manifest the inaccessible default options that could be implied by
BigTableIO.read(). There can sometimes be a pretty fine line between a
builder and an options object, to be sure. You might distinguish it by
whether you would conceivably use the object elsewhere - and for
BigTableOptions the answer is certainly "yes" since it actually is an
external class. In the extreme, every method takes one giant POJO and that
sucks.


> I think that in general all of these patterns are significantly worse in
> the long run than the existing standards, e.g., for BigtableIO.


Note that BigTableIO.read() is actually not "ready-to-go" but has nulls and
empty strings that cause crashes if they are not overridden. It is just a
builder without the concluding "build()" method (for the record: I find
concluding "build()" methods pointless, too :-)

One of the better examples of the pattern of "ready-to-go" builders -
though not a transform - is WindowingStrategy (props to Ben), where there
are intelligent defaults and you can override them, and it tracks whether
or not each field is a default or a user-specified value. To start it off
you have to either request "globalDefault()" or "of(WindowFn)", in the
spirit of #3.

Kenn

On Fri, Oct 7, 2016 at 4:48 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > In my original email, all FooBuilder's should be simply Foo. Sorry for
> the
> > confusion.
> >
> > On Thu, Oct 6, 2016 at 3:08 PM Kenneth Knowles 
> > wrote:
> >
> > > Mostly my thoughts are the same as Robert's. Use #3 whenever possible,
> > > fallback to #1 otherwise, but please consider using informative names
> for
> > > your methods in all cases.
> > >
> > > #1 GBK.create(): IMO this pattern is best only for transforms where
> > > withBar is optional or there is no such method, as in GBK. If it is
> > > mandatory, it should just be required in the first method, eliding the
> > > issue, as in ParDo.of(DoFn), MapElements.via(...), etc, like you
> > say
> > > in your concluding remark.
> > >
> > > #2 FooBuilder FooBuilder.create(): this too - if you are going to
> fix
> > > the type, fix it first. If it is optional and Foo is usable as a
> > > transform, then sure. (it would have be something weird like
> Foo > > OutputT, ?> extends PTransform)
> > >
> > > #3 Foo.create(Bar): this is best. Do this whenever possible. From my
> > > perspective, instead of "move the param to create(...)" I would
> describe
> > > this as "delete create() then rename withBar to create". Just skip the
> > > second step and you are in an even better design, withBar being the
> > > starting point. Just like ParDo.of and MapElements.via.
> > >
> > > #4 Dislike this, too, for the same reasons as #2 plus code bloat plus
> > user
> > > confusion.
> > >
> > > Side note since you use this method in all your examples: This kind of
> > use
> > > of "create" is a bad method name. There may be no new object "created".
> > > Sometimes we have no better idea, but create() is a poor default. For
> GBK
> > > both are bad: create() (we really only need one instance so why create
> > > anything?) and create() (what is the unlabeled boolean?). They
> > > would be improved by GBK.standard() and GBK.fewKeys() or some such. I
> > tend
> > > to think that focusing on this fine polish 

Re: Specifying type arguments for generic PTransform builders

2016-10-13 Thread Ben Chambers
This is also a good reason to avoid overly general names like "from",
"create" and "of". Instead, the option should be ".fromQuery(String
query)", so we can add ".fromTable(...)".

On Thu, Oct 13, 2016 at 4:55 PM Dan Halperin 
wrote:

> For #3 -- I think we should be VERY careful there. You need to be
> absolutely certain that there will never, ever be another alternative to
> your mandatory argument. For example, you build an option to read from a
> DB, so you supply a .from(String query). Then later, you want to add
> reading just a table directly, so you add fromTable(Table). In this case,
> it's much better to use .read().fromQuery() or .read().fromTable() --
> having ".read()" be the "standard builder a'la #1".
>
> The other major drawback of #3 is the inability to provide generic
> configuration. For example, a utility function that gives you a database
> reader ready-to-go with all the default credentials and options you need --
> but independent of the type you want the result to end up as. You can't do
> that if you must bind the type first.
>
> I think that in general all of these patterns are significantly worse in
> the long run than the existing standards, e.g., for BigtableIO. These
> suggestions are motivated by making things easier on transform writers, but
> IMO we need to be optimizing for transform users.
>
> On Fri, Oct 7, 2016 at 4:48 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > In my original email, all FooBuilder's should be simply Foo. Sorry for
> the
> > confusion.
> >
> > On Thu, Oct 6, 2016 at 3:08 PM Kenneth Knowles 
> > wrote:
> >
> > > Mostly my thoughts are the same as Robert's. Use #3 whenever possible,
> > > fallback to #1 otherwise, but please consider using informative names
> for
> > > your methods in all cases.
> > >
> > > #1 GBK.create(): IMO this pattern is best only for transforms where
> > > withBar is optional or there is no such method, as in GBK. If it is
> > > mandatory, it should just be required in the first method, eliding the
> > > issue, as in ParDo.of(DoFn), MapElements.via(...), etc, like you
> > say
> > > in your concluding remark.
> > >
> > > #2 FooBuilder FooBuilder.create(): this too - if you are going to
> fix
> > > the type, fix it first. If it is optional and Foo is usable as a
> > > transform, then sure. (it would have be something weird like
> Foo > > OutputT, ?> extends PTransform)
> > >
> > > #3 Foo.create(Bar): this is best. Do this whenever possible. From my
> > > perspective, instead of "move the param to create(...)" I would
> describe
> > > this as "delete create() then rename withBar to create". Just skip the
> > > second step and you are in an even better design, withBar being the
> > > starting point. Just like ParDo.of and MapElements.via.
> > >
> > > #4 Dislike this, too, for the same reasons as #2 plus code bloat plus
> > user
> > > confusion.
> > >
> > > Side note since you use this method in all your examples: This kind of
> > use
> > > of "create" is a bad method name. There may be no new object "created".
> > > Sometimes we have no better idea, but create() is a poor default. For
> GBK
> > > both are bad: create() (we really only need one instance so why create
> > > anything?) and create() (what is the unlabeled boolean?). They
> > > would be improved by GBK.standard() and GBK.fewKeys() or some such. I
> > tend
> > > to think that focusing on this fine polish eliminates a lot of cases
> for
> > > the generalized question.
> > >
> > > Kenn
> > >
> > > On Thu, Oct 6, 2016 at 2:10 PM Eugene Kirpichov
> > >  wrote:
> > >
> > > > Quite a few transforms in the SDK are generic (i.e. have type
> > > parameters),
> > > > e.g. ParDo, GroupByKey, Keys / WithKeys, many connectors (TextIO,
> > > KafkaIO,
> > > > JdbcIO, MongoDbGridFSIO etc - both read and write). They use
> different
> > > > styles of binding the type parameters to concrete types in caller
> code.
> > > >
> > > > I would like us to make a decision which of those styles to recommend
> > for
> > > > new transform and connectors writers. This question is coming up
> rather
> > > > frequently, e.g. it came up in JdbcIO and MongoDbGridFSIO.
> > > >
> > > > For the purpose of this discussion, imagine a hypothetical builder
> > class
> > > > that looks like this:
> > > >
> > > > class Foo {
> > > > private Bar bar;
> > > > private int blah;
> > > >
> > > > Foo withBlah(int blah);
> > > > }
> > > >
> > > > So far I've seen several styles of binding the type argument in a
> > > withBar()
> > > > method vs. a creation method:
> > > >
> > > > 1. Binding at the creation method: e.g.:
> > > >
> > > > class Foo {
> > > > ...
> > > > public static  Foo create();
> > > > public FooBuilder withBar(Bar bar);
> > > > }
> > > >
> > > > Foo foo = Foo.create().withBlah(42).withBar(new
> > > > StringBar());
> > > >
> > > > Example: GroupByKey does this. As 

Re: Specifying type arguments for generic PTransform builders

2016-10-13 Thread Dan Halperin
For #3 -- I think we should be VERY careful there. You need to be
absolutely certain that there will never, ever be another alternative to
your mandatory argument. For example, you build an option to read from a
DB, so you supply a .from(String query). Then later, you want to add
reading just a table directly, so you add fromTable(Table). In this case,
it's much better to use .read().fromQuery() or .read().fromTable() --
having ".read()" be the "standard builder a'la #1".

The other major drawback of #3 is the inability to provide generic
configuration. For example, a utility function that gives you a database
reader ready-to-go with all the default credentials and options you need --
but independent of the type you want the result to end up as. You can't do
that if you must bind the type first.

I think that in general all of these patterns are significantly worse in
the long run than the existing standards, e.g., for BigtableIO. These
suggestions are motivated by making things easier on transform writers, but
IMO we need to be optimizing for transform users.

On Fri, Oct 7, 2016 at 4:48 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> In my original email, all FooBuilder's should be simply Foo. Sorry for the
> confusion.
>
> On Thu, Oct 6, 2016 at 3:08 PM Kenneth Knowles 
> wrote:
>
> > Mostly my thoughts are the same as Robert's. Use #3 whenever possible,
> > fallback to #1 otherwise, but please consider using informative names for
> > your methods in all cases.
> >
> > #1 GBK.create(): IMO this pattern is best only for transforms where
> > withBar is optional or there is no such method, as in GBK. If it is
> > mandatory, it should just be required in the first method, eliding the
> > issue, as in ParDo.of(DoFn), MapElements.via(...), etc, like you
> say
> > in your concluding remark.
> >
> > #2 FooBuilder FooBuilder.create(): this too - if you are going to fix
> > the type, fix it first. If it is optional and Foo is usable as a
> > transform, then sure. (it would have be something weird like Foo > OutputT, ?> extends PTransform)
> >
> > #3 Foo.create(Bar): this is best. Do this whenever possible. From my
> > perspective, instead of "move the param to create(...)" I would describe
> > this as "delete create() then rename withBar to create". Just skip the
> > second step and you are in an even better design, withBar being the
> > starting point. Just like ParDo.of and MapElements.via.
> >
> > #4 Dislike this, too, for the same reasons as #2 plus code bloat plus
> user
> > confusion.
> >
> > Side note since you use this method in all your examples: This kind of
> use
> > of "create" is a bad method name. There may be no new object "created".
> > Sometimes we have no better idea, but create() is a poor default. For GBK
> > both are bad: create() (we really only need one instance so why create
> > anything?) and create() (what is the unlabeled boolean?). They
> > would be improved by GBK.standard() and GBK.fewKeys() or some such. I
> tend
> > to think that focusing on this fine polish eliminates a lot of cases for
> > the generalized question.
> >
> > Kenn
> >
> > On Thu, Oct 6, 2016 at 2:10 PM Eugene Kirpichov
> >  wrote:
> >
> > > Quite a few transforms in the SDK are generic (i.e. have type
> > parameters),
> > > e.g. ParDo, GroupByKey, Keys / WithKeys, many connectors (TextIO,
> > KafkaIO,
> > > JdbcIO, MongoDbGridFSIO etc - both read and write). They use different
> > > styles of binding the type parameters to concrete types in caller code.
> > >
> > > I would like us to make a decision which of those styles to recommend
> for
> > > new transform and connectors writers. This question is coming up rather
> > > frequently, e.g. it came up in JdbcIO and MongoDbGridFSIO.
> > >
> > > For the purpose of this discussion, imagine a hypothetical builder
> class
> > > that looks like this:
> > >
> > > class Foo {
> > > private Bar bar;
> > > private int blah;
> > >
> > > Foo withBlah(int blah);
> > > }
> > >
> > > So far I've seen several styles of binding the type argument in a
> > withBar()
> > > method vs. a creation method:
> > >
> > > 1. Binding at the creation method: e.g.:
> > >
> > > class Foo {
> > > ...
> > > public static  Foo create();
> > > public FooBuilder withBar(Bar bar);
> > > }
> > >
> > > Foo foo = Foo.create().withBlah(42).withBar(new
> > > StringBar());
> > >
> > > Example: GroupByKey does this. As well as other transforms that don't
> > have
> > > a withBar()-like method, but still need a type argument, e.g. Keys.
> > >
> > > Pros: completely unambiguous, easy to code, interacts well with
> > @AutoValue
> > > Cons: need to specify type once at call site.
> > >
> > > 2. Binding at a method that takes an argument of the given type (let us
> > > call it "a constraint argument"), e.g.:
> > >
> > > class Foo {
> > > ...
> > > public static FooBuilder create();
> > > public  FooBuilder 

Re: [3/4] incubator-beam git commit: Remove KeyedResourcePool

2016-10-13 Thread Daniel Kulp

I just submitted a pull request that fixes the code as well as cherry-picks the 
yaml change from the last branch.  

Dan



> On Oct 13, 2016, at 10:48 AM, Jean-Baptiste Onofré  wrote:
> 
> Indeed the .travis.yml has not been merged. I gonna fix that.
> 
> Sorry about that.
> 
> Regards
> JB
> 
> On 10/13/2016 04:37 PM, Daniel Kulp wrote:
>> 
>> This is in m2e.That said, it looks like the travis.yml file wasn’t 
>> merged from my “eclipse” branch so Travis wasn’t actually running agains the 
>> eclipse compiler.   That would have caught this.   JB and I will investigate 
>> how that got lost in the merge to master.
>> 
>> A "mvn -Peclipse-jdt clean install” in direct-java would show the same error.
>> 
>> 
>> Dan
>> 
>> 
>> 
>>> On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré  
>>> wrote:
>>> 
>>> Hi Dan,
>>> 
>>> You mean directly building in Eclipse I guess using m2e ?
>>> 
>>> Regards
>>> JB
>>> 
>>> On 10/13/2016 03:59 PM, Daniel Kulp wrote:
 
 Just an FYI:   this commit has caused things to not build in Eclipse, but 
 I’m not exactly sure why.   The errors are in place where methods of the 
 exact signature just moved into an internal class so I’m not yet sure why 
 it’s causing an issue.
 
 DescriptionResourcePathLocationType
 Bound mismatch: The type Read.Bounded is not a valid substitute 
 for the bounded parameter >>> InputT,OutputT>> of the type AppliedPTransform  
   BoundedReadEvaluatorFactory.java
 /beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  
 line 134Java Problem
 
 
 Dan
 
 
 
 
 On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:
> Remove KeyedResourcePool
> 
> This interface is no longer used. Instead, the runner ensures that
> bundles will be provided containing the appropriate input to the
> TestStreamEvaluatorFactory.
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
> Commit: 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
> 
> Branch: refs/heads/master
> Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
> Parents: 7306e16
> Author: Thomas Groh 
> Authored: Wed Oct 5 13:12:48 2016 -0700
> Committer: Luke Cwik 
> Committed: Thu Oct 6 15:14:38 2016 -0700
> 
> --
> .../direct/BoundedReadEvaluatorFactory.java |  40 +++--
> .../beam/runners/direct/DirectRunner.java   |   2 +
> .../beam/runners/direct/EmptyInputProvider.java |  49 ++
> .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
> .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
> .../beam/runners/direct/KeyedResourcePool.java  |  47 --
> .../runners/direct/LockedKeyedResourcePool.java |  95 ---
> .../beam/runners/direct/RootInputProvider.java  |  41 +
> .../runners/direct/RootProviderRegistry.java|  65 
> .../direct/RootTransformEvaluatorFactory.java   |  42 -
> .../direct/TestStreamEvaluatorFactory.java  |  39 +++--
> .../direct/TransformEvaluatorRegistry.java  |  17 +-
> .../direct/UnboundedReadEvaluatorFactory.java   |  56 ---
> .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
> .../direct/FlattenEvaluatorFactoryTest.java |   3 +-
> .../direct/LockedKeyedResourcePoolTest.java | 163 ---
> .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
> .../UnboundedReadEvaluatorFactoryTest.java  |   8 +-
> 18 files changed, 269 insertions(+), 449 deletions(-)
> --
> 
> 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> --
> diff --git 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>  
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> index 4936ad9..326a535 100644
> --- 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> +++ 
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
> * A {@link TransformEvaluatorFactory} that produces {@link 
> 

Re: [3/4] incubator-beam git commit: Remove KeyedResourcePool

2016-10-13 Thread Jean-Baptiste Onofré

Indeed the .travis.yml has not been merged. I gonna fix that.

Sorry about that.

Regards
JB

On 10/13/2016 04:37 PM, Daniel Kulp wrote:


This is in m2e.That said, it looks like the travis.yml file wasn’t merged 
from my “eclipse” branch so Travis wasn’t actually running agains the eclipse 
compiler.   That would have caught this.   JB and I will investigate how that 
got lost in the merge to master.

A "mvn -Peclipse-jdt clean install” in direct-java would show the same error.


Dan




On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré  wrote:

Hi Dan,

You mean directly building in Eclipse I guess using m2e ?

Regards
JB

On 10/13/2016 03:59 PM, Daniel Kulp wrote:


Just an FYI:   this commit has caused things to not build in Eclipse, but I’m 
not exactly sure why.   The errors are in place where methods of the exact 
signature just moved into an internal class so I’m not yet sure why it’s 
causing an issue.

Description ResourcePathLocationType
Bound mismatch: The type Read.Bounded is not a valid substitute for the bounded 
parameter > of the type 
AppliedPTransform BoundedReadEvaluatorFactory.java
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 134Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:

Remove KeyedResourcePool

This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0

Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh 
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik 
Committed: Thu Oct 6 15:14:38 2016 -0700

--
.../direct/BoundedReadEvaluatorFactory.java |  40 +++--
.../beam/runners/direct/DirectRunner.java   |   2 +
.../beam/runners/direct/EmptyInputProvider.java |  49 ++
.../direct/ExecutorServiceParallelExecutor.java |  27 ++-
.../runners/direct/FlattenEvaluatorFactory.java |  18 +-
.../beam/runners/direct/KeyedResourcePool.java  |  47 --
.../runners/direct/LockedKeyedResourcePool.java |  95 ---
.../beam/runners/direct/RootInputProvider.java  |  41 +
.../runners/direct/RootProviderRegistry.java|  65 
.../direct/RootTransformEvaluatorFactory.java   |  42 -
.../direct/TestStreamEvaluatorFactory.java  |  39 +++--
.../direct/TransformEvaluatorRegistry.java  |  17 +-
.../direct/UnboundedReadEvaluatorFactory.java   |  56 ---
.../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
.../direct/FlattenEvaluatorFactoryTest.java |   3 +-
.../direct/LockedKeyedResourcePoolTest.java | 163 ---
.../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
.../UnboundedReadEvaluatorFactoryTest.java  |   8 +-
18 files changed, 269 insertions(+), 449 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 4936ad9..326a535 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
 * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
 * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
 */
-final class BoundedReadEvaluatorFactory implements 
RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
  private final EvaluationContext evaluationContext;

  BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
  }

-  @Override
-  public Collection getInitialInputs(AppliedPTransform transform) {
-return createInitialSplits((AppliedPTransform) transform);
-  }
-
-  private  Collection createInitialSplits(
-  AppliedPTransform> transform) {
-BoundedSource source = 

Re: [3/4] incubator-beam git commit: Remove KeyedResourcePool

2016-10-13 Thread Daniel Kulp

This is in m2e.That said, it looks like the travis.yml file wasn’t merged 
from my “eclipse” branch so Travis wasn’t actually running agains the eclipse 
compiler.   That would have caught this.   JB and I will investigate how that 
got lost in the merge to master.

A "mvn -Peclipse-jdt clean install” in direct-java would show the same error.


Dan



> On Oct 13, 2016, at 10:05 AM, Jean-Baptiste Onofré  wrote:
> 
> Hi Dan,
> 
> You mean directly building in Eclipse I guess using m2e ?
> 
> Regards
> JB
> 
> On 10/13/2016 03:59 PM, Daniel Kulp wrote:
>> 
>> Just an FYI:   this commit has caused things to not build in Eclipse, but 
>> I’m not exactly sure why.   The errors are in place where methods of the 
>> exact signature just moved into an internal class so I’m not yet sure why 
>> it’s causing an issue.
>> 
>> Description  ResourcePathLocationType
>> Bound mismatch: The type Read.Bounded is not a valid substitute for 
>> the bounded parameter > InputT,OutputT>> of the type AppliedPTransform  
>> BoundedReadEvaluatorFactory.java
>> /beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 
>> 134Java Problem
>> 
>> 
>> Dan
>> 
>> 
>> 
>> 
>> On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:
>>> Remove KeyedResourcePool
>>> 
>>> This interface is no longer used. Instead, the runner ensures that
>>> bundles will be provided containing the appropriate input to the
>>> TestStreamEvaluatorFactory.
>>> 
>>> 
>>> Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
>>> Commit: 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
>>> Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
>>> Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
>>> 
>>> Branch: refs/heads/master
>>> Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
>>> Parents: 7306e16
>>> Author: Thomas Groh 
>>> Authored: Wed Oct 5 13:12:48 2016 -0700
>>> Committer: Luke Cwik 
>>> Committed: Thu Oct 6 15:14:38 2016 -0700
>>> 
>>> --
>>> .../direct/BoundedReadEvaluatorFactory.java |  40 +++--
>>> .../beam/runners/direct/DirectRunner.java   |   2 +
>>> .../beam/runners/direct/EmptyInputProvider.java |  49 ++
>>> .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
>>> .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
>>> .../beam/runners/direct/KeyedResourcePool.java  |  47 --
>>> .../runners/direct/LockedKeyedResourcePool.java |  95 ---
>>> .../beam/runners/direct/RootInputProvider.java  |  41 +
>>> .../runners/direct/RootProviderRegistry.java|  65 
>>> .../direct/RootTransformEvaluatorFactory.java   |  42 -
>>> .../direct/TestStreamEvaluatorFactory.java  |  39 +++--
>>> .../direct/TransformEvaluatorRegistry.java  |  17 +-
>>> .../direct/UnboundedReadEvaluatorFactory.java   |  56 ---
>>> .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
>>> .../direct/FlattenEvaluatorFactoryTest.java |   3 +-
>>> .../direct/LockedKeyedResourcePoolTest.java | 163 ---
>>> .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
>>> .../UnboundedReadEvaluatorFactoryTest.java  |   8 +-
>>> 18 files changed, 269 insertions(+), 449 deletions(-)
>>> --
>>> 
>>> 
>>> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> --
>>> diff --git 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>>  
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> index 4936ad9..326a535 100644
>>> --- 
>>> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> +++ 
>>> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>>> @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
>>>  * A {@link TransformEvaluatorFactory} that produces {@link 
>>> TransformEvaluator TransformEvaluators}
>>>  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
>>>  */
>>> -final class BoundedReadEvaluatorFactory implements 
>>> RootTransformEvaluatorFactory {
>>> +final class BoundedReadEvaluatorFactory implements 
>>> TransformEvaluatorFactory {
>>>   private final EvaluationContext evaluationContext;
>>> 
>>>   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
>>> this.evaluationContext = evaluationContext;
>>>   }
>>> 
>>> -  @Override
>>> -  public Collection 
>>> getInitialInputs(AppliedPTransform 

Re: [3/4] incubator-beam git commit: Remove KeyedResourcePool

2016-10-13 Thread Jean-Baptiste Onofré

Hi Dan,

You mean directly building in Eclipse I guess using m2e ?

Regards
JB

On 10/13/2016 03:59 PM, Daniel Kulp wrote:


Just an FYI:   this commit has caused things to not build in Eclipse, but I’m 
not exactly sure why.   The errors are in place where methods of the exact 
signature just moved into an internal class so I’m not yet sure why it’s 
causing an issue.

Description ResourcePathLocationType
Bound mismatch: The type Read.Bounded is not a valid substitute for the bounded 
parameter > of the type 
AppliedPTransform BoundedReadEvaluatorFactory.java
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 134Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote:

Remove KeyedResourcePool

This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0

Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh 
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik 
Committed: Thu Oct 6 15:14:38 2016 -0700

--
 .../direct/BoundedReadEvaluatorFactory.java |  40 +++--
 .../beam/runners/direct/DirectRunner.java   |   2 +
 .../beam/runners/direct/EmptyInputProvider.java |  49 ++
 .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
 .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
 .../beam/runners/direct/KeyedResourcePool.java  |  47 --
 .../runners/direct/LockedKeyedResourcePool.java |  95 ---
 .../beam/runners/direct/RootInputProvider.java  |  41 +
 .../runners/direct/RootProviderRegistry.java|  65 
 .../direct/RootTransformEvaluatorFactory.java   |  42 -
 .../direct/TestStreamEvaluatorFactory.java  |  39 +++--
 .../direct/TransformEvaluatorRegistry.java  |  17 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |  56 ---
 .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
 .../direct/FlattenEvaluatorFactoryTest.java |   3 +-
 .../direct/LockedKeyedResourcePoolTest.java | 163 ---
 .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
 .../UnboundedReadEvaluatorFactoryTest.java  |   8 +-
 18 files changed, 269 insertions(+), 449 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 4936ad9..326a535 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
  */
-final class BoundedReadEvaluatorFactory implements 
RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;

   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
 this.evaluationContext = evaluationContext;
   }

-  @Override
-  public Collection getInitialInputs(AppliedPTransform transform) {
-return createInitialSplits((AppliedPTransform) transform);
-  }
-
-  private  Collection createInitialSplits(
-  AppliedPTransform> transform) {
-BoundedSource source = transform.getTransform().getSource();
-return Collections.singleton(
-evaluationContext
-.createRootBundle()
-
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
-.commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
@@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements 
RootTransformEvaluatorFactory

 abstract BoundedSource getSource();
   }
+
+  static class InputProvider implements 

Re: [3/4] incubator-beam git commit: Remove KeyedResourcePool

2016-10-13 Thread Daniel Kulp

Just an FYI:   this commit has caused things to not build in Eclipse, but I’m 
not exactly sure why.   The errors are in place where methods of the exact 
signature just moved into an internal class so I’m not yet sure why it’s 
causing an issue.

Description ResourcePathLocationType
Bound mismatch: The type Read.Bounded is not a valid substitute for 
the bounded parameter > 
of the type AppliedPTransform 
BoundedReadEvaluatorFactory.java
/beam-runners-direct-java/src/main/java/org/apache/beam/runners/direct  line 
134Java Problem


Dan




On 2016-10-06 18:31 (-0400), lc...@apache.org wrote: 
> Remove KeyedResourcePool
> 
> This interface is no longer used. Instead, the runner ensures that
> bundles will be provided containing the appropriate input to the
> TestStreamEvaluatorFactory.
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
> 
> Branch: refs/heads/master
> Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
> Parents: 7306e16
> Author: Thomas Groh 
> Authored: Wed Oct 5 13:12:48 2016 -0700
> Committer: Luke Cwik 
> Committed: Thu Oct 6 15:14:38 2016 -0700
> 
> --
>  .../direct/BoundedReadEvaluatorFactory.java |  40 +++--
>  .../beam/runners/direct/DirectRunner.java   |   2 +
>  .../beam/runners/direct/EmptyInputProvider.java |  49 ++
>  .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
>  .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
>  .../beam/runners/direct/KeyedResourcePool.java  |  47 --
>  .../runners/direct/LockedKeyedResourcePool.java |  95 ---
>  .../beam/runners/direct/RootInputProvider.java  |  41 +
>  .../runners/direct/RootProviderRegistry.java|  65 
>  .../direct/RootTransformEvaluatorFactory.java   |  42 -
>  .../direct/TestStreamEvaluatorFactory.java  |  39 +++--
>  .../direct/TransformEvaluatorRegistry.java  |  17 +-
>  .../direct/UnboundedReadEvaluatorFactory.java   |  56 ---
>  .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
>  .../direct/FlattenEvaluatorFactoryTest.java |   3 +-
>  .../direct/LockedKeyedResourcePoolTest.java | 163 ---
>  .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
>  .../UnboundedReadEvaluatorFactoryTest.java  |   8 +-
>  18 files changed, 269 insertions(+), 449 deletions(-)
> --
> 
> 
> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> --
> diff --git 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
>  
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> index 4936ad9..326a535 100644
> --- 
> a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> +++ 
> b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
> @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
>   * A {@link TransformEvaluatorFactory} that produces {@link 
> TransformEvaluator TransformEvaluators}
>   * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
>   */
> -final class BoundedReadEvaluatorFactory implements 
> RootTransformEvaluatorFactory {
> +final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory 
> {
>private final EvaluationContext evaluationContext;
>  
>BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
>  this.evaluationContext = evaluationContext;
>}
>  
> -  @Override
> -  public Collection 
> getInitialInputs(AppliedPTransform transform) {
> -return createInitialSplits((AppliedPTransform) transform);
> -  }
> -
> -  private  Collection createInitialSplits(
> -  AppliedPTransform> transform) {
> -BoundedSource source = transform.getTransform().getSource();
> -return Collections.singleton(
> -evaluationContext
> -.createRootBundle()
> -
> .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
> -.commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
> -  }
> -
>@SuppressWarnings({"unchecked", "rawtypes"})
>@Override
>@Nullable
> @@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements 
> RootTransformEvaluatorFactory
>  
>  abstract 

Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-10-13 Thread Maximilian Michels
The Flink runner currently only supports blocking execution. I'll open
a pull request to at least fix waitUntilFinish().

-Max


On Thu, Oct 13, 2016 at 11:10 AM, Amit Sela  wrote:
> Hi Pei,
>
> I have someone on my time who started to work on this, I'll follow-up,
> thanks for the bum ;-)
>
> Amit
>
> On Thu, Oct 13, 2016 at 8:38 AM Jean-Baptiste Onofré 
> wrote:
>
>> Hi Pei,
>>
>> good one !
>>
>> We now have to update the 'other' runners.
>>
>> Thanks.
>>
>> Regards
>> JB
>>
>> On 10/12/2016 10:48 PM, Pei He wrote:
>> > Hi,
>> > I just want to bump this thread, and brought it to attention.
>> >
>> > PipelineResult now have cancel() and waitUntilFinish(). However,
>> currently
>> > only DataflowRunner supports it in DataflowPipelineJob.
>> >
>> > We agreed that users should do "p.run().waitUntilFinish()" if they want
>> to
>> > block. But, if they do it now, direct, flink, spark runners will throw
>> > exceptions.
>> >
>> > I have following jira issues opened, I am wondering could any people help
>> > on them?
>> >
>> > https://issues.apache.org/jira/browse/BEAM-596
>> > https://issues.apache.org/jira/browse/BEAM-595
>> > https://issues.apache.org/jira/browse/BEAM-593
>> >
>> > Thanks
>> > --
>> > Pei
>> >
>> >
>> >
>> >
>> > On Tue, Jul 26, 2016 at 10:54 AM, Amit Sela 
>> wrote:
>> >
>> >> +1 and Thanks!
>> >>
>> >> On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw
>> >> 
>> >> wrote:
>> >>
>> >>> +1, sounds great. Thanks Pei.
>> >>>
>> >>> On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik > >
>> >>> wrote:
>>  +1 for your proposal Pei
>> 
>>  On Mon, Jul 25, 2016 at 5:54 PM, Pei He 
>> >>> wrote:
>> 
>> > Looks to me that followings are agreed:
>> > (1). adding cancel() and waitUntilFinish() to PipelineResult.
>> > (In streaming mode, "all data watermarks reach to infinity" is
>> > considered as finished.)
>> > (2). PipelineRunner.run() should return relatively quick as soon as
>> > the pipeline/job is started/running. The blocking logic should be
>> left
>> > to users' code to handle with PipelineResult.waitUntilFinish(). (Test
>> > runners that finish quickly can block run() until the execution is
>> > done. So, it is cleaner to verify test results after run())
>> >
>> > I will send out PR for (1), and create jira issues to improve runners
>> >>> for
>> > (2).
>> >
>> > waitToRunning() is controversial, and we have several half way agreed
>> > proposals.
>> > I will pull them out from this thread, so we can close this proposal
>> > with cancel() and waitUntilFinish(). And, i will create a jira issue
>> > to track how to support ''waiting until other states".
>> >
>> > Does that sound good with anyone?
>> >
>> > Thanks
>> > --
>> > Pei
>> >
>> > On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
>> >  wrote:
>> >> On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers > >>>
>> > wrote:
>> >>> This health check seems redundant with just waiting a while and
>> >> then
>> >>> checking on the status, other than returning earlier in the case of
>> >>> reaching a terminal state. What about adding:
>> >>>
>> >>> /**
>> >>>  * Returns the state after waiting the specified duration. Will
>> >>> return
>> >>> earlier if the pipeline
>> >>>  * reaches a terminal state.
>> >>>  */
>> >>> State getStateAfter(Duration duration);
>> >>>
>> >>> This seems to be a useful building block, both for the user's
>> >>> pipeline
>> > (in
>> >>> case they wanted to build something like wait and then check
>> >> health)
>> >>> and
>> >>> also for the SDK (to implement waitUntilFinished, etc.)
>> >>
>> >> A generic waitFor(Duration) which may return early if a terminal
>> >> state
>> >> is entered seems useful. I don't know that we need a return value
>> >> here, given that we an then query the PipelineResult however we want
>> >> once this returns. waitUntilFinished is simply
>> >> waitFor(InfiniteDuration).
>> >>
>> >>> On Thu, Jul 21, 2016 at 4:11 PM Pei He 
>> > wrote:
>> >>>
>>  I am not in favor of supporting wait for every states or
>>  waitUntilState(...).
>>  One reason is PipelineResult.State is not well defined and is not
>>  agreed upon runners.
>>  Another reason is users might not want to wait for a particular
>> >>> state.
>>  For example,
>>  waitUntilFinish() is to wait for a terminal state.
>>  So, even runners have different states, we still can define shared
>>  properties, such as finished/terminal.
>> >>
>> >> +1. Running is an intermediate state that doesn't have an obvious
>> >> mapping onto 

Re: Simplifying User-Defined Metrics in Beam

2016-10-13 Thread Amit Sela
On Thu, Oct 13, 2016 at 12:27 PM Aljoscha Krettek 
wrote:

> I finally found the time to have a look. :-)
>
> The API looks very good! (It's very similar to an API we recently added to
> Flink, which is inspired by the same Codahale/Dropwizard metrics).
>
> About the semantics, the "A", "B" and "C" you mention in the doc: doesn't
> this mean that we have to keep the metrics in some fault-tolerant way?
> Almost in something like the StateInternals, because they should survive
> failures and contain the metrics over the successful runs. (Side note: in
> Flink the metrics are just "since the last restart from failure" in case of
> failures.)
>
> About querying the metrics, what we have mostly seen is that people want to
> integrate metrics into a Metrics system that they already have in place.
> They use Graphite, StatsD or simply JMX for this. In Flink we provide an
> API for reporters that users can plug in to export the metrics to their
> system of choice. I'm sure some people will like the option of having the
> metrics queryable on the PipelineResult but I would assume that for most
> production use cases integration with a metrics system is more important.
>
With Aggregators, the Spark runner samples the aggregated value in
intervals and reports them via Codahale API, and the runner currently
supports GraphiteSink and CsvSink, but it's fairly easy to add more.
This is currently relying on a Spark API that wraps the MetricRegistry and
Reporter, but Beam could easily do the same.
I've raised this issue before, and the feeling back then was that each
runner should use the it's own metric reporting infra. we can revisit
though.

>
> Regarding removal of Aggregators I'm for B, to quote Saint Exupéry:
>   "It seems that perfection is attained not when there is nothing more to
> add, but when there is nothing more to remove."
>
> Cheers,
> Aljoscha
>
> On Wed, 12 Oct 2016 at 20:22 Robert Bradshaw 
> wrote:
>
> > +1 to the new metrics design. I strongly favor B as well.
> >
> > On Wed, Oct 12, 2016 at 10:54 AM, Kenneth Knowles
> >  wrote:
> > > Correction: In my eagerness to see the end of aggregators, I mistook
> the
> > > intention. Both A and B leave aggregators in place until there is a
> > > replacement. In which case, I am strongly in favor of B. As soon as we
> > can
> > > remove aggregators, I think we should.
> > >
> > > On Wed, Oct 12, 2016 at 10:48 AM Kenneth Knowles 
> wrote:
> > >
> > >> Huzzah! This is IMO a really great change. I agree that we can get
> > >> something in to allow work to continue, and improve the API as we
> learn.
> > >>
> > >> On Wed, Oct 12, 2016 at 10:20 AM Ben Chambers
> > 
> > >> wrote:
> > >>
> > >> 3. One open question is what to do with Aggregators. In the doc I
> > mentioned
> > >>
> > >> that long term I'd like to consider whether we can improve Aggregators
> > to
> > >> be a better fit for the model by supporting windowing and allowing
> them
> > to
> > >> serve as input for future steps. In the interim it's not clear what we
> > >> should do with them. The two obvious (and extreme) options seem to be:
> > >>
> > >>
> > >>
> > >>   Option A: Do nothing, leave aggregators as they are until we
> revisit.
> > >>
> > >>
> > >>   Option B: Remove aggregators from the SDK until we revisit.
> > >>
> > >> I'd like to suggest removing Aggregators once the existing runners
> have
> > >> reasonable support for Metrics. Doing so reduces the surface area we
> > need
> > >> to maintain/support and simplifies other changes being made. It will
> > also
> > >> allow us to revisit them from a clean slate.
> > >>
> > >>
> > >> +1 to removing aggregators, either of A or B. The new metrics design
> > >> addresses aggregator use cases as well or better.
> > >>
> > >> So A vs B is a choice of whether we have a gap with no aggregator or
> > >> metrics-like functionality. I think that is perhaps a bit of a bummer
> > for
> > >> users, and we will likely port over the runner code for it, so we
> > wouldn't
> > >> want to actually delete it, right? Can we do it in a week or two?
> > >>
> > >> One thing motivating me to do this quickly: Currently the new DoFn
> does
> > >> not have its own implementation of aggregators, but leverages that of
> > >> OldDoFn, so we cannot remove OldDoFn until either (1) new DoFn
> > >> re-implements the aggregator instantiation and worker-side delegation
> > (not
> > >> hard, but it is throwaway code) or (2) aggregators are removed. This
> > >> dependency also makes running the new DoFn directly (required for the
> > state
> > >> API) a bit more annoying.
> > >>
> >
>


Re: Simplifying User-Defined Metrics in Beam

2016-10-13 Thread Aljoscha Krettek
I finally found the time to have a look. :-)

The API looks very good! (It's very similar to an API we recently added to
Flink, which is inspired by the same Codahale/Dropwizard metrics).

About the semantics, the "A", "B" and "C" you mention in the doc: doesn't
this mean that we have to keep the metrics in some fault-tolerant way?
Almost in something like the StateInternals, because they should survive
failures and contain the metrics over the successful runs. (Side note: in
Flink the metrics are just "since the last restart from failure" in case of
failures.)

About querying the metrics, what we have mostly seen is that people want to
integrate metrics into a Metrics system that they already have in place.
They use Graphite, StatsD or simply JMX for this. In Flink we provide an
API for reporters that users can plug in to export the metrics to their
system of choice. I'm sure some people will like the option of having the
metrics queryable on the PipelineResult but I would assume that for most
production use cases integration with a metrics system is more important.

Regarding removal of Aggregators I'm for B, to quote Saint Exupéry:
  "It seems that perfection is attained not when there is nothing more to
add, but when there is nothing more to remove."

Cheers,
Aljoscha

On Wed, 12 Oct 2016 at 20:22 Robert Bradshaw 
wrote:

> +1 to the new metrics design. I strongly favor B as well.
>
> On Wed, Oct 12, 2016 at 10:54 AM, Kenneth Knowles
>  wrote:
> > Correction: In my eagerness to see the end of aggregators, I mistook the
> > intention. Both A and B leave aggregators in place until there is a
> > replacement. In which case, I am strongly in favor of B. As soon as we
> can
> > remove aggregators, I think we should.
> >
> > On Wed, Oct 12, 2016 at 10:48 AM Kenneth Knowles  wrote:
> >
> >> Huzzah! This is IMO a really great change. I agree that we can get
> >> something in to allow work to continue, and improve the API as we learn.
> >>
> >> On Wed, Oct 12, 2016 at 10:20 AM Ben Chambers
> 
> >> wrote:
> >>
> >> 3. One open question is what to do with Aggregators. In the doc I
> mentioned
> >>
> >> that long term I'd like to consider whether we can improve Aggregators
> to
> >> be a better fit for the model by supporting windowing and allowing them
> to
> >> serve as input for future steps. In the interim it's not clear what we
> >> should do with them. The two obvious (and extreme) options seem to be:
> >>
> >>
> >>
> >>   Option A: Do nothing, leave aggregators as they are until we revisit.
> >>
> >>
> >>   Option B: Remove aggregators from the SDK until we revisit.
> >>
> >> I'd like to suggest removing Aggregators once the existing runners have
> >> reasonable support for Metrics. Doing so reduces the surface area we
> need
> >> to maintain/support and simplifies other changes being made. It will
> also
> >> allow us to revisit them from a clean slate.
> >>
> >>
> >> +1 to removing aggregators, either of A or B. The new metrics design
> >> addresses aggregator use cases as well or better.
> >>
> >> So A vs B is a choice of whether we have a gap with no aggregator or
> >> metrics-like functionality. I think that is perhaps a bit of a bummer
> for
> >> users, and we will likely port over the runner code for it, so we
> wouldn't
> >> want to actually delete it, right? Can we do it in a week or two?
> >>
> >> One thing motivating me to do this quickly: Currently the new DoFn does
> >> not have its own implementation of aggregators, but leverages that of
> >> OldDoFn, so we cannot remove OldDoFn until either (1) new DoFn
> >> re-implements the aggregator instantiation and worker-side delegation
> (not
> >> hard, but it is throwaway code) or (2) aggregators are removed. This
> >> dependency also makes running the new DoFn directly (required for the
> state
> >> API) a bit more annoying.
> >>
>


Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-10-13 Thread Amit Sela
Hi Pei,

I have someone on my time who started to work on this, I'll follow-up,
thanks for the bum ;-)

Amit

On Thu, Oct 13, 2016 at 8:38 AM Jean-Baptiste Onofré 
wrote:

> Hi Pei,
>
> good one !
>
> We now have to update the 'other' runners.
>
> Thanks.
>
> Regards
> JB
>
> On 10/12/2016 10:48 PM, Pei He wrote:
> > Hi,
> > I just want to bump this thread, and brought it to attention.
> >
> > PipelineResult now have cancel() and waitUntilFinish(). However,
> currently
> > only DataflowRunner supports it in DataflowPipelineJob.
> >
> > We agreed that users should do "p.run().waitUntilFinish()" if they want
> to
> > block. But, if they do it now, direct, flink, spark runners will throw
> > exceptions.
> >
> > I have following jira issues opened, I am wondering could any people help
> > on them?
> >
> > https://issues.apache.org/jira/browse/BEAM-596
> > https://issues.apache.org/jira/browse/BEAM-595
> > https://issues.apache.org/jira/browse/BEAM-593
> >
> > Thanks
> > --
> > Pei
> >
> >
> >
> >
> > On Tue, Jul 26, 2016 at 10:54 AM, Amit Sela 
> wrote:
> >
> >> +1 and Thanks!
> >>
> >> On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw
> >> 
> >> wrote:
> >>
> >>> +1, sounds great. Thanks Pei.
> >>>
> >>> On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik  >
> >>> wrote:
>  +1 for your proposal Pei
> 
>  On Mon, Jul 25, 2016 at 5:54 PM, Pei He 
> >>> wrote:
> 
> > Looks to me that followings are agreed:
> > (1). adding cancel() and waitUntilFinish() to PipelineResult.
> > (In streaming mode, "all data watermarks reach to infinity" is
> > considered as finished.)
> > (2). PipelineRunner.run() should return relatively quick as soon as
> > the pipeline/job is started/running. The blocking logic should be
> left
> > to users' code to handle with PipelineResult.waitUntilFinish(). (Test
> > runners that finish quickly can block run() until the execution is
> > done. So, it is cleaner to verify test results after run())
> >
> > I will send out PR for (1), and create jira issues to improve runners
> >>> for
> > (2).
> >
> > waitToRunning() is controversial, and we have several half way agreed
> > proposals.
> > I will pull them out from this thread, so we can close this proposal
> > with cancel() and waitUntilFinish(). And, i will create a jira issue
> > to track how to support ''waiting until other states".
> >
> > Does that sound good with anyone?
> >
> > Thanks
> > --
> > Pei
> >
> > On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
> >  wrote:
> >> On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers  >>>
> > wrote:
> >>> This health check seems redundant with just waiting a while and
> >> then
> >>> checking on the status, other than returning earlier in the case of
> >>> reaching a terminal state. What about adding:
> >>>
> >>> /**
> >>>  * Returns the state after waiting the specified duration. Will
> >>> return
> >>> earlier if the pipeline
> >>>  * reaches a terminal state.
> >>>  */
> >>> State getStateAfter(Duration duration);
> >>>
> >>> This seems to be a useful building block, both for the user's
> >>> pipeline
> > (in
> >>> case they wanted to build something like wait and then check
> >> health)
> >>> and
> >>> also for the SDK (to implement waitUntilFinished, etc.)
> >>
> >> A generic waitFor(Duration) which may return early if a terminal
> >> state
> >> is entered seems useful. I don't know that we need a return value
> >> here, given that we an then query the PipelineResult however we want
> >> once this returns. waitUntilFinished is simply
> >> waitFor(InfiniteDuration).
> >>
> >>> On Thu, Jul 21, 2016 at 4:11 PM Pei He 
> > wrote:
> >>>
>  I am not in favor of supporting wait for every states or
>  waitUntilState(...).
>  One reason is PipelineResult.State is not well defined and is not
>  agreed upon runners.
>  Another reason is users might not want to wait for a particular
> >>> state.
>  For example,
>  waitUntilFinish() is to wait for a terminal state.
>  So, even runners have different states, we still can define shared
>  properties, such as finished/terminal.
> >>
> >> +1. Running is an intermediate state that doesn't have an obvious
> >> mapping onto all runners, which is another reason it's odd to wait
> >> until then. All runners have terminal states.
> >>
>  I think when users call waitUntilRunning(), they want to make sure
> >>> the
>  pipeline is up running and is healthy.
>  Maybe we want to wait for at
>  least one element went through the pipeline.