[jira] [Commented] (BEAM-2595) WriteToBigQuery does not work with nested json schema

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085278#comment-16085278
 ] 

ASF GitHub Bot commented on BEAM-2595:
--

GitHub user sb2nov opened a pull request:

https://github.com/apache/beam/pull/3556

[BEAM-2595] Allow table schema objects in BQ DoFn

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---

R: @chamikaramj PTAL

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sb2nov/beam 
BEAM-2595-allow-table-schema-bq-dogn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3556.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3556


commit f89d59db369037c0bdb8fcb5d26fb1fd4b4599f7
Author: Sourabh Bajaj 
Date:   2017-07-13T06:50:24Z

[BEAM-2595] Allow table schema objects in BQ DoFn




> WriteToBigQuery does not work with nested json schema
> -
>
> Key: BEAM-2595
> URL: https://issues.apache.org/jira/browse/BEAM-2595
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py
>Affects Versions: 2.1.0
> Environment: mac os local runner, Python
>Reporter: Andrea Pierleoni
>Assignee: Sourabh Bajaj
>Priority: Minor
>  Labels: gcp
> Fix For: 2.1.0
>
>
> I am trying to use the new `WriteToBigQuery` PTransform added to 
> `apache_beam.io.gcp.bigquery` in version 2.1.0-RC1
> I need to write to a bigquery table with nested fields.
> The only way to specify nested schemas in bigquery is with teh json schema.
> None of the classes in `apache_beam.io.gcp.bigquery` are able to parse the 
> json schema, but they accept a schema as an instance of the class 
> `apache_beam.io.gcp.internal.clients.bigquery.TableFieldSchema`
> I am composing the `TableFieldSchema` as suggested here 
> [https://stackoverflow.com/questions/36127537/json-table-schema-to-bigquery-tableschema-for-bigquerysink/45039436#45039436],
>  and it looks fine when passed to the PTransform `WriteToBigQuery`. 
> The problem is that the base class `PTransformWithSideInputs` try to pickle 
> and unpickle the function 
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/ptransform.py#L515]
>   (that includes the TableFieldSchema instance) and for some reason when the 
> class is unpickled some `FieldList` instance are converted to simple lists, 
> and the pickling validation fails.
> Would it be possible to extend the test coverage to nested json objects for 
> bigquery?
> They are also relatively easy to parse into a TableFieldSchema.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3556: [BEAM-2595] Allow table schema objects in BQ DoFn

2017-07-12 Thread sb2nov
GitHub user sb2nov opened a pull request:

https://github.com/apache/beam/pull/3556

[BEAM-2595] Allow table schema objects in BQ DoFn

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---

R: @chamikaramj PTAL

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sb2nov/beam 
BEAM-2595-allow-table-schema-bq-dogn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3556.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3556


commit f89d59db369037c0bdb8fcb5d26fb1fd4b4599f7
Author: Sourabh Bajaj 
Date:   2017-07-13T06:50:24Z

[BEAM-2595] Allow table schema objects in BQ DoFn




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2619

2017-07-12 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3389

2017-07-12 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PostCommit_Java_ValidatesRunner_Apex #1974

2017-07-12 Thread Apache Jenkins Server
See 


--
[...truncated 238.71 KB...]
2017-07-13T06:02:59.396 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.pom
 (11 KB at 338.3 KB/sec)
2017-07-13T06:02:59.409 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-integration-tools/1.6/doxia-integration-tools-1.6.pom
2017-07-13T06:02:59.436 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-integration-tools/1.6/doxia-integration-tools-1.6.pom
 (7 KB at 242.6 KB/sec)
2017-07-13T06:02:59.439 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-tools/3/doxia-tools-3.pom
2017-07-13T06:02:59.468 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-tools/3/doxia-tools-3.pom
 (10 KB at 346.3 KB/sec)
2017-07-13T06:02:59.472 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-logging-api/1.6/doxia-logging-api-1.6.pom
2017-07-13T06:02:59.499 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-logging-api/1.6/doxia-logging-api-1.6.pom
 (2 KB at 57.7 KB/sec)
2017-07-13T06:02:59.501 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia/1.6/doxia-1.6.pom
2017-07-13T06:02:59.533 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia/1.6/doxia-1.6.pom
 (19 KB at 579.3 KB/sec)
2017-07-13T06:02:59.537 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-decoration-model/1.6/doxia-decoration-model-1.6.pom
2017-07-13T06:02:59.570 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-decoration-model/1.6/doxia-decoration-model-1.6.pom
 (3 KB at 88.6 KB/sec)
2017-07-13T06:02:59.572 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-sitetools/1.6/doxia-sitetools-1.6.pom
2017-07-13T06:02:59.602 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-sitetools/1.6/doxia-sitetools-1.6.pom
 (18 KB at 582.3 KB/sec)
2017-07-13T06:02:59.607 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-interpolation/1.21/plexus-interpolation-1.21.pom
2017-07-13T06:02:59.634 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-interpolation/1.21/plexus-interpolation-1.21.pom
 (2 KB at 55.7 KB/sec)
2017-07-13T06:02:59.637 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-velocity/1.1.8/plexus-velocity-1.1.8.pom
2017-07-13T06:02:59.663 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-velocity/1.1.8/plexus-velocity-1.1.8.pom
 (2 KB at 72.2 KB/sec)
2017-07-13T06:02:59.665 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-components/1.1.15/plexus-components-1.1.15.pom
2017-07-13T06:02:59.691 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-components/1.1.15/plexus-components-1.1.15.pom
 (3 KB at 107.0 KB/sec)
2017-07-13T06:02:59.705 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/com/puppycrawl/tools/checkstyle/6.19/checkstyle-6.19.jar
2017-07-13T06:02:59.706 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/antlr/antlr4-runtime/4.5.3/antlr4-runtime-4.5.3.jar
2017-07-13T06:02:59.707 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/commons-beanutils/commons-beanutils/1.9.2/commons-beanutils-1.9.2.jar
2017-07-13T06:02:59.708 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/antlr/antlr/2.7.7/antlr-2.7.7.jar
2017-07-13T06:02:59.709 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.jar
2017-07-13T06:02:59.754 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.jar
 (52 KB at 1101.0 KB/sec)
2017-07-13T06:02:59.754 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-integration-tools/1.6/doxia-integration-tools-1.6.jar
2017-07-13T06:02:59.785 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/org/apache/maven/doxia/doxia-integration-tools/1.6/doxia-integration-tools-1.6.jar
 (44 KB at 566.0 KB/sec)
2017-07-13T06:02:59.786 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-interpolation/1.21/plexus-interpolation-1.21.jar
2017-07-13T06:02:59.825 [INFO] Downloaded: 
https://repo.maven.apache.org/maven2/commons-beanutils/commons-beanutils/1.9.2/commons-beanutils-1.9.2.jar
 (229 KB at 1919.1 KB/sec)
2017-07-13T06:02:59.825 [INFO] Downloading: 
https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-velocity/1.1.8/plexus-velocity-1.1.8.jar
2017-07-13T06:02:59.826 [INFO] Downloaded: 
https://repo.maven.a

Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3388

2017-07-12 Thread Apache Jenkins Server
See 




[jira] [Created] (BEAM-2611) Better document and validate arguments of WindowInto

2017-07-12 Thread Robert Bradshaw (JIRA)
Robert Bradshaw created BEAM-2611:
-

 Summary: Better document and validate arguments of WindowInto
 Key: BEAM-2611
 URL: https://issues.apache.org/jira/browse/BEAM-2611
 Project: Beam
  Issue Type: Bug
  Components: sdk-py
Reporter: Robert Bradshaw
Assignee: Ahmet Altay


The current documentation is

{code:python}
  def __init__(self, windowfn, **kwargs):
"""Initializes a WindowInto transform.

Args:
  windowfn: Function to be used for windowing
"""
{code}

kwargs is taken to force keyword specification of the triggerfn, 
accumulation_mode, and timestamp_combiner, but the fact that these three 
arguments are the expected ones in the docstring. We should also validate that 
no other arguments are ignored (in particular, a missspelling of the above).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: beam_PostCommit_Python_Verify #2716

2017-07-12 Thread Apache Jenkins Server
See 


--
[...truncated 567.36 KB...]
return self.runner.run(self)
  File 
"
 line 38, in run
self.result = super(TestDataflowRunner, self).run(pipeline)
  File 
"
 line 267, in run
pipeline._options)
  File 
"
 line 394, in __init__
get_credentials=(not self.google_cloud_options.no_auth))
  File 
"
 line 56, in __init__
self.channels = self.ChannelsService(self)
  File 
"
 line 405, in __init__
supports_download=False,
  File 
"
 line 791, in __init__
setattr(self, name, value)
  File 
"
 line 958, in __setattr__
def __setattr__(self, name, value):
  File 
"
 line 276, in signalhandler
raise TimedOutException()
TimedOutException: 'test_empty_singleton_side_input 
(apache_beam.transforms.sideinputs_test.SideInputsTest)'

==
ERROR: test_flattened_side_input 
(apache_beam.transforms.sideinputs_test.SideInputsTest)
--
Traceback (most recent call last):
  File 
"
 line 812, in run
test(orig)
  File 
"
 line 45, in __call__
return self.run(*arg, **kwarg)
  File 
"
 line 133, in run
self.runTest(result)
  File 
"
 line 151, in runTest
test(result)
  File "/usr/lib/python2.7/unittest/case.py", line 395, in __call__
return self.run(*args, **kwds)
  File "/usr/lib/python2.7/unittest/case.py", line 331, in run
testMethod()
  File 
"
 line 305, in test_flattened_side_input
pipeline.run()
  File 
"
 line 96, in run
result = super(TestPipeline, self).run()
  File 
"
 line 328, in run
return self.runner.run(self)
  File 
"
 line 38, in run
self.result = super(TestDataflowRunner, self).run(pipeline)
  File 
"
 line 258, in run
super(DataflowRunner, self).run(pipeline)
  File 
"
 line 133, in run
pipeline.visit(RunVisitor(self))
  File 
"
 line 353, in visit
self._root_transform().visit(visitor, self, visited)
  File 
"
 line 685, in visit
part.visit(visitor, pipeline, visited)
  File 
"
 line 685, in visit
part.visit(visitor, pipeline, visited)
  File 
"
 line 688, in visit
visitor.visit_transf

[jira] [Commented] (BEAM-2231) ApiSurface should be lazy

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085149#comment-16085149
 ] 

ASF GitHub Bot commented on BEAM-2231:
--

GitHub user evindj reopened a pull request:

https://github.com/apache/beam/pull/3548

[BEAM-2231] Added pruning of the namespace

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`.
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/evindj/beam BEAM-2231

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3548.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3548


commit 24015ef7b454f06db498f24e45f10fc896683c1b
Author: Innocent Djiofack 
Date:   2017-07-12T12:28:18Z

Added pruning of the namespace




> ApiSurface should be lazy
> -
>
> Key: BEAM-2231
> URL: https://issues.apache.org/jira/browse/BEAM-2231
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Innocent
>
> Currently, the ApiSurface loads classes recursively, when they should be 
> pruned before loading by the pruning pattern. This has caused crashes because 
> some classes that are never referenced in our code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2231) ApiSurface should be lazy

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085148#comment-16085148
 ] 

ASF GitHub Bot commented on BEAM-2231:
--

Github user evindj closed the pull request at:

https://github.com/apache/beam/pull/3548


> ApiSurface should be lazy
> -
>
> Key: BEAM-2231
> URL: https://issues.apache.org/jira/browse/BEAM-2231
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Innocent
>
> Currently, the ApiSurface loads classes recursively, when they should be 
> pruned before loading by the pruning pattern. This has caused crashes because 
> some classes that are never referenced in our code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3548: [BEAM-2231] Added pruning of the namespace

2017-07-12 Thread evindj
GitHub user evindj reopened a pull request:

https://github.com/apache/beam/pull/3548

[BEAM-2231] Added pruning of the namespace

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`.
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/evindj/beam BEAM-2231

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3548.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3548


commit 24015ef7b454f06db498f24e45f10fc896683c1b
Author: Innocent Djiofack 
Date:   2017-07-12T12:28:18Z

Added pruning of the namespace




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #3548: [BEAM-2231] Added pruning of the namespace

2017-07-12 Thread evindj
Github user evindj closed the pull request at:

https://github.com/apache/beam/pull/3548


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2610) upgrade to version 2.2.0

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085096#comment-16085096
 ] 

ASF GitHub Bot commented on BEAM-2610:
--

Github user XuMingmin closed the pull request at:

https://github.com/apache/beam/pull/3553


> upgrade to version 2.2.0
> 
>
> Key: BEAM-2610
> URL: https://issues.apache.org/jira/browse/BEAM-2610
> Project: Beam
>  Issue Type: Task
>  Components: dsl-sql
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>  Labels: dsl_sql_merge
>
> This task syncs changes from master branch which is now using version 
> 2.2.0-SNAPSHOT. 
> As usual, there will be two PRs,
> 1. a pull request from master to DSL_SQL, this one is merged by ignoring any 
> errors;
> 2. a second PR to finish the change in DSL_SQL, and also fix any potential 
> issue;



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3553: [BEAM-2610] upgrade to version 2.2.0

2017-07-12 Thread XuMingmin
Github user XuMingmin closed the pull request at:

https://github.com/apache/beam/pull/3553


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[18/50] [abbrv] beam git commit: Update Dataflow container version to 20170706

2017-07-12 Thread takidau
Update Dataflow container version to 20170706


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92eec586
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92eec586
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92eec586

Branch: refs/heads/DSL_SQL
Commit: 92eec586c966d9cce89539596dd750c757d92316
Parents: 699d59a
Author: Kenneth Knowles 
Authored: Thu Jul 6 11:07:38 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 runners/google-cloud-dataflow-java/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/92eec586/runners/google-cloud-dataflow-java/pom.xml
--
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index 91908cd..c8d63ac 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   jar
 
   
-
beam-master-20170623
+
beam-master-20170706
 
1
 
6
   



[35/50] [abbrv] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-12 Thread takidau
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
new file mode 100644
index 000..e7ef0f6
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/** Some helper classes that derive from {@link 
FileBasedSink.DynamicDestinations}. */
+public class DynamicFileDestinations {
+  /** Always returns a constant {@link FilenamePolicy}. */
+  private static class ConstantFilenamePolicy extends 
DynamicDestinations {
+private final FilenamePolicy filenamePolicy;
+
+public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
+  this.filenamePolicy = filenamePolicy;
+}
+
+@Override
+public Void getDestination(T element) {
+  return (Void) null;
+}
+
+@Override
+public Coder getDestinationCoder() {
+  return null;
+}
+
+@Override
+public Void getDefaultDestination() {
+  return (Void) null;
+}
+
+@Override
+public FilenamePolicy getFilenamePolicy(Void destination) {
+  return filenamePolicy;
+}
+
+@Override
+public void populateDisplayData(DisplayData.Builder builder) {
+  filenamePolicy.populateDisplayData(builder);
+}
+  }
+
+  /**
+   * A base class for a {@link DynamicDestinations} object that returns 
differently-configured
+   * instances of {@link DefaultFilenamePolicy}.
+   */
+  private static class DefaultPolicyDestinations extends 
DynamicDestinations {
+SerializableFunction destinationFunction;
+Params emptyDestination;
+
+public DefaultPolicyDestinations(
+SerializableFunction destinationFunction, Params 
emptyDestination) {
+  this.destinationFunction = destinationFunction;
+  this.emptyDestination = emptyDestination;
+}
+
+@Override
+public Params getDestination(UserT element) {
+  return destinationFunction.apply(element);
+}
+
+@Override
+public Params getDefaultDestination() {
+  return emptyDestination;
+}
+
+@Nullable
+@Override
+public Coder getDestinationCoder() {
+  return ParamsCoder.of();
+}
+
+@Override
+public FilenamePolicy getFilenamePolicy(DefaultFilenamePolicy.Params 
params) {
+  return DefaultFilenamePolicy.fromParams(params);
+}
+  }
+
+  /** Returns a {@link DynamicDestinations} that always returns the same 
{@link FilenamePolicy}. */
+  public static  DynamicDestinations constant(FilenamePolicy 
filenamePolicy) {
+return new ConstantFilenamePolicy<>(filenamePolicy);
+  }
+
+  /**
+   * Returns a {@link DynamicDestinations} that returns instances of {@link 
DefaultFilenamePolicy}
+   * configured with the given {@link Params}.
+   */
+  public static  DynamicDestinations toDefaultPolicies(
+  SerializableFunction destinationFunction, Params 
emptyDestination) {
+return new DefaultPolicyDestinations<>(destinationFunction, 
emptyDestination);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 8102316..583af60 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io

[40/50] [abbrv] beam git commit: [BEAM-2447] Reintroduces DoFn.ProcessContinuation

2017-07-12 Thread takidau
[BEAM-2447] Reintroduces DoFn.ProcessContinuation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f7f1699
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f7f1699
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f7f1699

Branch: refs/heads/DSL_SQL
Commit: 4f7f16990a8fc49a9b6ae199809f0ada7dc7448d
Parents: bd2a8cc
Author: Eugene Kirpichov 
Authored: Tue Jun 13 16:50:35 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../core/construction/SplittableParDoTest.java  |  10 +-
 ...eBoundedSplittableProcessElementInvoker.java |  35 ++-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   9 +-
 .../core/SplittableProcessElementInvoker.java   |  25 -
 ...ndedSplittableProcessElementInvokerTest.java |  45 +++--
 .../core/SplittableParDoProcessFnTest.java  |  99 --
 .../org/apache/beam/sdk/transforms/DoFn.java|  51 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java|  19 +++-
 .../sdk/transforms/reflect/DoFnInvoker.java |   4 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  10 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +++-
 .../splittabledofn/OffsetRangeTracker.java  |  10 ++
 .../splittabledofn/RestrictionTracker.java  |  11 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 100 ---
 .../transforms/reflect/DoFnInvokersTest.java|  93 +
 .../DoFnSignaturesProcessElementTest.java   |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java   |  83 +--
 17 files changed, 487 insertions(+), 141 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
--
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index f4c596e..267232c 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
@@ -24,8 +25,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -70,7 +69,6 @@ public class SplittableParDoTest {
 public void checkDone() {}
   }
 
-  @BoundedPerElement
   private static class BoundedFakeFn extends DoFn {
 @ProcessElement
 public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
@@ -81,10 +79,12 @@ public class SplittableParDoTest {
 }
   }
 
-  @UnboundedPerElement
   private static class UnboundedFakeFn extends DoFn {
 @ProcessElement
-public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+public ProcessContinuation processElement(
+ProcessContext context, SomeRestrictionTracker tracker) {
+  return stop();
+}
 
 @GetInitialRestriction
 public SomeRestriction getInitialRestriction(Integer element) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 475abf2..0c956d5 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -96,7 +96,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
   final WindowedValue element,
   

[10/50] [abbrv] beam git commit: Disallow Combiner Lifting for multi-window WindowFns

2017-07-12 Thread takidau
Disallow Combiner Lifting for multi-window WindowFns


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4fa33e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4fa33e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4fa33e3

Branch: refs/heads/DSL_SQL
Commit: d4fa33e346185395577aa3ce537bfd4a1eb8b4f7
Parents: a7cad60
Author: Thomas Groh 
Authored: Wed Jul 5 14:16:50 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 .../apache/beam/runners/dataflow/DataflowPipelineTranslator.java| 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d4fa33e3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 28fd1bb..f1783de 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -793,6 +793,7 @@ public class DataflowPipelineTranslator {
 
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
 boolean disallowCombinerLifting =
 !windowingStrategy.getWindowFn().isNonMerging()
+|| !windowingStrategy.getWindowFn().assignsToOneWindow()
 || (isStreaming && !transform.fewKeys())
 // TODO: Allow combiner lifting on the non-default 
trigger, as appropriate.
 || !(windowingStrategy.getTrigger() instanceof 
DefaultTrigger);



[07/50] [abbrv] beam git commit: Update Python SDK version

2017-07-12 Thread takidau
Update Python SDK version


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a32db077
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a32db077
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a32db077

Branch: refs/heads/DSL_SQL
Commit: a32db07733f84f3c395d1447ac3db66d04e68c4f
Parents: cd15751
Author: Jean-Baptiste Onofré 
Authored: Wed Jul 5 16:52:48 2017 +0200
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 sdks/python/apache_beam/version.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a32db077/sdks/python/apache_beam/version.py
--
diff --git a/sdks/python/apache_beam/version.py 
b/sdks/python/apache_beam/version.py
index ae92a23..8b0a430 100644
--- a/sdks/python/apache_beam/version.py
+++ b/sdks/python/apache_beam/version.py
@@ -18,4 +18,4 @@
 """Apache Beam SDK version information and utilities."""
 
 
-__version__ = '2.1.0.dev'
+__version__ = '2.2.0.dev'



[33/50] [abbrv] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-12 Thread takidau
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 9468893..8797ff7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -42,7 +42,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -69,22 +71,31 @@ import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -205,7 +216,7 @@ public class TextIOTest {
 });
   }
 
-  private  void runTestRead(String[] expected) throws Exception {
+  private void runTestRead(String[] expected) throws Exception {
 File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
 String filename = tmpFile.getPath();
 
@@ -274,6 +285,213 @@ public class TextIOTest {
 displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar");
   }
 
+  static class TestDynamicDestinations extends DynamicDestinations {
+ResourceId baseDir;
+
+TestDynamicDestinations(ResourceId baseDir) {
+  this.baseDir = baseDir;
+}
+
+@Override
+public String getDestination(String element) {
+  // Destination is based on first character of string.
+  return element.substring(0, 1);
+}
+
+@Override
+public String getDefaultDestination() {
+  return "";
+}
+
+@Nullable
+@Override
+public Coder getDestinationCoder() {
+  return StringUtf8Coder.of();
+}
+
+@Override
+public FilenamePolicy getFilenamePolicy(String destination) {
+  return DefaultFilenamePolicy.fromStandardParameters(
+  StaticValueProvider.of(
+  baseDir.resolve("file_" + destination + ".txt", 
StandardResolveOptions.RESOLVE_FILE)),
+  null,
+  null,
+  false);
+}
+  }
+
+  class StartsWith implements Predicate {
+String prefix;
+
+StartsWith(String prefix) {
+  this.prefix = prefix;
+}
+
+@Override
+public boolean apply(@Nullable String input) {
+  return input.startsWith(prefix);
+}
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinations() throws Exception {
+ResourceId baseDir =
+FileSystems.matchNewResource(
+Files.createTempDirectory(tempFolder, 
"testDynamicDestinations").toString(), true);
+
+List elements = Lists.newArrayList("", "aaab", "baaa", "baab", 
"caaa", "caab");
+PCollection input = 
p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
+input.apply(
+TextIO.write()
+.to(new TestDynamicDestinations(baseDir))
+
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+p.run();
+
+assertOutputFiles(
+Iterables.toArray(Iterables.

[45/50] [abbrv] beam git commit: Reflect #assignsToOneWindow in WindowingStrategy

2017-07-12 Thread takidau
Reflect #assignsToOneWindow in WindowingStrategy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ae2a790
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ae2a790
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ae2a790

Branch: refs/heads/DSL_SQL
Commit: 8ae2a790978267ea48e9c3601ba6551d1141a11e
Parents: 83f31e9
Author: Thomas Groh 
Authored: Tue Jun 27 15:03:11 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../runners/core/construction/WindowingStrategyTranslation.java | 1 +
 .../core/construction/WindowingStrategyTranslationTest.java | 3 +++
 sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 5 +
 3 files changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/8ae2a790/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 88ebc01..1456a3f 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -307,6 +307,7 @@ public class WindowingStrategyTranslation implements 
Serializable {
 
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
 
.setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger()))
 .setWindowFn(windowFnSpec)
+
.setAssignsToOneWindow(windowingStrategy.getWindowFn().assignsToOneWindow())
 .setWindowCoderId(
 
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8ae2a790/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
--
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index e406545..7a57fd7 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -116,5 +116,8 @@ public class WindowingStrategyTranslationTest {
 
 protoComponents.getCodersOrThrow(
 
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+assertThat(
+proto.getAssignsToOneWindow(),
+equalTo(windowingStrategy.getWindowFn().assignsToOneWindow()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8ae2a790/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 1f74afb..711da2a 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -439,6 +439,11 @@ message WindowingStrategy {
 
   // (Required) Indicate whether empty on-time panes should be omitted.
   OnTimeBehavior OnTimeBehavior = 9;
+
+  // (Required) Whether or not the window fn assigns inputs to exactly one 
window
+  //
+  // This knowledge is required for some optimizations
+  bool assigns_to_one_window = 10;
 }
 
 // Whether or not a PCollection's WindowFn is non-merging, merging, or



[41/50] [abbrv] beam git commit: Remove dead (and wrong) viewFromProto overload

2017-07-12 Thread takidau
Remove dead (and wrong) viewFromProto overload


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e42ae41
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e42ae41
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e42ae41

Branch: refs/heads/DSL_SQL
Commit: 2e42ae41babcf42db5df8320f0823d6040f559cb
Parents: f1defd1
Author: Kenneth Knowles 
Authored: Tue Jul 11 10:09:12 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../core/construction/ParDoTranslation.java | 21 
 .../core/construction/ParDoTranslationTest.java |  2 +-
 2 files changed, 1 insertion(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2e42ae41/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 90c9aad..03f29ff 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -41,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -509,26 +508,6 @@ public class ParDoTranslation {
 return builder.build();
   }
 
-  public static PCollectionView viewFromProto(
-  Pipeline pipeline,
-  SideInput sideInput,
-  String localName,
-  RunnerApi.PTransform parDoTransform,
-  Components components)
-  throws IOException {
-
-String pCollectionId = parDoTransform.getInputsOrThrow(localName);
-
-// This may be a PCollection defined in another language, but we should be
-// able to rehydrate it enough to stick it in a side input. The coder may 
not
-// be grokkable in Java.
-PCollection pCollection =
-PCollectionTranslation.fromProto(
-pipeline, components.getPcollectionsOrThrow(pCollectionId), 
components);
-
-return viewFromProto(sideInput, localName, pCollection, parDoTransform, 
components);
-  }
-
   /**
* Create a {@link PCollectionView} from a side input spec and an 
already-deserialized {@link
* PCollection} that should be wired up.

http://git-wip-us.apache.org/repos/asf/beam/blob/2e42ae41/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
--
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 6fdf9d6..a87a16d 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -162,9 +162,9 @@ public class ParDoTranslationTest {
 SideInput sideInput = 
parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
 PCollectionView restoredView =
 ParDoTranslation.viewFromProto(
-rehydratedPipeline,
 sideInput,
 view.getTagInternal().getId(),
+view.getPCollection(),
 protoTransform,
 protoComponents);
 assertThat(restoredView.getTagInternal(), 
equalTo(view.getTagInternal()));



[38/50] [abbrv] beam git commit: Add client-side throttling.

2017-07-12 Thread takidau
Add client-side throttling.

The approach used is as described in
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
. By backing off individual workers in response to high error rates, we relieve
pressure on the Datastore service, increasing the chance that the workload can
complete successfully.

The exported cumulativeThrottledSeconds could also be used as an autoscaling
signal in future.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1defd14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1defd14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1defd14

Branch: refs/heads/DSL_SQL
Commit: f1defd14c943d65e946cda081fe22a872ce6ce07
Parents: 7925a66
Author: Colin Phipps 
Authored: Mon Jun 26 13:34:19 2017 +
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../sdk/io/gcp/datastore/AdaptiveThrottler.java | 103 +
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  25 -
 .../io/gcp/datastore/AdaptiveThrottlerTest.java | 111 +++
 3 files changed, 238 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/f1defd14/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
new file mode 100644
index 000..ce6ebe6
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Random;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.MovingFunction;
+
+
+/**
+ * An implementation of client-side adaptive throttling. See
+ * 
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
+ * for a full discussion of the use case and algorithm applied.
+ */
+class AdaptiveThrottler {
+  private final MovingFunction successfulRequests;
+  private final MovingFunction allRequests;
+
+  /** The target ratio between requests sent and successful requests. This is 
"K" in the formula in
+   * https://landing.google.com/sre/book/chapters/handling-overload.html */
+  private final double overloadRatio;
+
+  /** The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
+   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
+   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
+   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html */
+  private static final double MIN_REQUESTS = 1;
+  private final Random random;
+
+  /**
+   * @param samplePeriodMs the time window to keep of request history to 
inform throttling
+   * decisions.
+   * @param sampleUpdateMs the length of buckets within this time window.
+   * @param overloadRatio the target ratio between requests sent and 
successful requests. You should
+   * always set this to more than 1, otherwise the client would never try to 
send more requests than
+   * succeeded in the past - so it could never recover from temporary setbacks.
+   */
+  public AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
+  double overloadRatio) {
+this(samplePeriodMs, sampleUpdateMs, overloadRatio, new Random());
+  }
+
+  @VisibleForTesting
+  AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
+  double overloadRatio, Random random) {
+allRequests =
+new MovingFunction(samplePeriodMs, sampleUpdateMs,
+  

[28/50] [abbrv] beam git commit: [BEAM-2570] Fix breakage after cloud-bigquery updated

2017-07-12 Thread takidau
[BEAM-2570] Fix breakage after cloud-bigquery updated


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eee0c9c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eee0c9c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eee0c9c3

Branch: refs/heads/DSL_SQL
Commit: eee0c9c38ea33d123c9be6ee81fd8f31bcb44d14
Parents: b1313ff
Author: Mark Liu 
Authored: Fri Jul 7 15:20:12 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py  | 6 +++---
 sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py | 2 +-
 sdks/python/setup.py  | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/eee0c9c3/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
--
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 844cbc5..d6f0e97 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -92,9 +92,9 @@ class BigqueryMatcher(BaseMatcher):
 page_token = None
 results = []
 while True:
-  rows, _, page_token = query.fetch_data(page_token=page_token)
-  results.extend(rows)
-  if not page_token:
+  for row in query.fetch_data(page_token=page_token):
+results.append(row)
+  if results:
 break
 
 return results

http://git-wip-us.apache.org/repos/asf/beam/blob/eee0c9c3/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
--
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index f12293e..5b72285 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -53,7 +53,7 @@ class BigqueryMatcherTest(unittest.TestCase):
 matcher = bq_verifier.BigqueryMatcher(
 'mock_project',
 'mock_query',
-'da39a3ee5e6b4b0d3255bfef95601890afd80709')
+'59f9d6bdee30d67ea73b8aded121c3a0280f9cd8')
 hc_assert_that(self._mock_result, matcher)
 
   @patch.object(bigquery, 'Client')

http://git-wip-us.apache.org/repos/asf/beam/blob/eee0c9c3/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 8a0c9ae..da82466 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -122,7 +122,7 @@ GCP_REQUIREMENTS = [
   'googledatastore==7.0.1',
   'google-cloud-pubsub==0.26.0',
   # GCP packages required by tests
-  'google-cloud-bigquery>=0.23.0,<0.26.0',
+  'google-cloud-bigquery==0.25.0',
 ]
 
 



[15/50] [abbrv] beam git commit: Made DataflowRunner TransformTranslator public

2017-07-12 Thread takidau
Made DataflowRunner TransformTranslator public


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e0f2587
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e0f2587
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e0f2587

Branch: refs/heads/DSL_SQL
Commit: 5e0f2587b95016d0bbf0a1adcebf55ceb7cbc111
Parents: 7b4fa89
Author: Jeremie Lenfant-Engelmann 
Authored: Wed Jun 28 16:11:21 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../org/apache/beam/runners/dataflow/TransformTranslator.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5e0f2587/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index a7452b2..7f61b6c 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -36,7 +36,8 @@ import org.apache.beam.sdk.values.TupleTag;
  * A {@link TransformTranslator} knows how to translate a particular subclass 
of {@link PTransform}
  * for the Cloud Dataflow service. It does so by mutating the {@link 
TranslationContext}.
  */
-interface TransformTranslator {
+@Internal
+public interface TransformTranslator {
   void translate(TransformT transform, TranslationContext context);
 
   /**



[30/50] [abbrv] beam git commit: Add more utilities to ParDoTranslation

2017-07-12 Thread takidau
Add more utilities to ParDoTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/860e0a08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/860e0a08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/860e0a08

Branch: refs/heads/DSL_SQL
Commit: 860e0a08ecd84533220f6ef8e18d1409964d69cd
Parents: 1f17b8a
Author: Kenneth Knowles 
Authored: Thu Jun 8 13:46:18 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../core/construction/ParDoTranslation.java | 48 
 1 file changed, 48 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/860e0a08/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 34e0d86..5f2bcae 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -34,9 +34,11 @@ import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
@@ -215,11 +218,56 @@ public class ParDoTranslation {
 return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
   }
 
+  public static DoFn getDoFn(AppliedPTransform application) 
throws IOException {
+return getDoFn(getParDoPayload(application));
+  }
+
   public static TupleTag getMainOutputTag(ParDoPayload payload)
   throws InvalidProtocolBufferException {
 return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
   }
 
+  public static TupleTag getMainOutputTag(AppliedPTransform 
application)
+  throws IOException {
+return getMainOutputTag(getParDoPayload(application));
+  }
+
+  public static TupleTagList getAdditionalOutputTags(AppliedPTransform application)
+  throws IOException {
+
+RunnerApi.PTransform protoTransform =
+PTransformTranslation.toProto(application, SdkComponents.create());
+
+ParDoPayload payload = 
protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+TupleTag mainOutputTag = getMainOutputTag(payload);
+Set outputTags =
+Sets.difference(
+protoTransform.getOutputsMap().keySet(), 
Collections.singleton(mainOutputTag.getId()));
+
+ArrayList> additionalOutputTags = new ArrayList<>();
+for (String outputTag : outputTags) {
+  additionalOutputTags.add(new TupleTag<>(outputTag));
+}
+return TupleTagList.of(additionalOutputTags);
+  }
+
+  public static List> getSideInputs(AppliedPTransform application)
+  throws IOException {
+
+SdkComponents sdkComponents = SdkComponents.create();
+RunnerApi.PTransform parDoProto =
+PTransformTranslation.toProto(application, sdkComponents);
+ParDoPayload payload = 
parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
+
+List> views = new ArrayList<>();
+for (Map.Entry sideInput : 
payload.getSideInputsMap().entrySet()) {
+  views.add(
+  fromProto(
+  sideInput.getValue(), sideInput.getKey(), parDoProto, 
sdkComponents.toComponents()));
+}
+return views;
+  }
+
   public static RunnerApi.PCollection getMainInput(
   RunnerApi.PTransform ptransform, Components components) throws 
IOException {
 checkArgument(



[02/50] [abbrv] beam git commit: Add timeout to initialization of partition in KafkaIO

2017-07-12 Thread takidau
Add timeout to initialization of partition in KafkaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c167d109
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c167d109
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c167d109

Branch: refs/heads/DSL_SQL
Commit: c167d10968b1bbd4f959f93ab3bcd4f76576c823
Parents: 4862703
Author: Raghu Angadi 
Authored: Mon Jul 3 23:54:10 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 81 +++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 30 
 2 files changed, 92 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c167d109/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e520367..026313a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,9 +49,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -1061,8 +1063,32 @@ public class KafkaIO {
   curBatch = Iterators.cycle(nonEmpty);
 }
 
+private void setupInitialOffset(PartitionState pState) {
+  Read spec = source.spec;
+
+  if (pState.nextOffset != UNINITIALIZED_OFFSET) {
+consumer.seek(pState.topicPartition, pState.nextOffset);
+  } else {
+// nextOffset is unininitialized here, meaning start reading from 
latest record as of now
+// ('latest' is the default, and is configurable) or 'look up offset 
by startReadTime.
+// Remember the current position without waiting until the first 
record is read. This
+// ensures checkpoint is accurate even if the reader is closed before 
reading any records.
+Instant startReadTime = spec.getStartReadTime();
+if (startReadTime != null) {
+  pState.nextOffset =
+  consumerSpEL.offsetForTime(consumer, pState.topicPartition, 
spec.getStartReadTime());
+  consumer.seek(pState.topicPartition, pState.nextOffset);
+} else {
+  pState.nextOffset = consumer.position(pState.topicPartition);
+}
+  }
+}
+
 @Override
 public boolean start() throws IOException {
+  final int defaultPartitionInitTimeout = 60 * 1000;
+  final int kafkaRequestTimeoutMultiple = 2;
+
   Read spec = source.spec;
   consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
   consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -1077,25 +1103,38 @@ public class KafkaIO {
   keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
   valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
 
-  for (PartitionState p : partitionStates) {
-if (p.nextOffset != UNINITIALIZED_OFFSET) {
-  consumer.seek(p.topicPartition, p.nextOffset);
-} else {
-  // nextOffset is unininitialized here, meaning start reading from 
latest record as of now
-  // ('latest' is the default, and is configurable) or 'look up offset 
by startReadTime.
-  // Remember the current position without waiting until the first 
record is read. This
-  // ensures checkpoint is accurate even if the reader is closed 
before reading any records.
-  Instant startReadTime = spec.getStartReadTime();
-  if (startReadTime != null) {
-p.nextOffset =
-consumerSpEL.offsetForTime(consumer, p.topicPartition, 
spec.getStartReadTime());
-consumer.seek(p.topicPartition, p.nextOffset);
-  } else {
-p.nextOffset = consumer.position(p.topicPartition);
+  // Seek to start offset for each partition. This is the first 
interaction with the server.
+  // Unfortunately it can block forever in case of network issues like 
incorrect ACLs.
+  // Initialize partition in a separate thread and cancel it if takes 
longer than a minute.
+  for (final PartitionState pState : partitionStates) {
+Future future =  consumerPollThrea

[12/50] [abbrv] beam git commit: Fix DoFn javadoc: StateSpec does not require a key

2017-07-12 Thread takidau
Fix DoFn javadoc: StateSpec does not require a key


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4bf16155
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4bf16155
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4bf16155

Branch: refs/heads/DSL_SQL
Commit: 4bf1615575338e7af64e4db1bfe11856495aa91d
Parents: c73e69a
Author: Kenneth Knowles 
Authored: Fri Jun 30 21:42:17 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 .../core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4bf16155/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index fb6d0ee..a2e5c16 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -385,7 +385,7 @@ public abstract class DoFn implements 
Serializable, HasDisplayD
* {@literal new DoFn, Baz>()} {
*
*  {@literal @StateId("my-state-id")}
-   *  {@literal private final StateSpec>} myStateSpec =
+   *  {@literal private final StateSpec>} myStateSpec =
*   StateSpecs.value(new MyStateCoder());
*
*  {@literal @ProcessElement}



[31/50] [abbrv] beam git commit: Fix misleading comment in TransformHierarchy

2017-07-12 Thread takidau
Fix misleading comment in TransformHierarchy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fc06b798
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fc06b798
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fc06b798

Branch: refs/heads/DSL_SQL
Commit: fc06b798749144c908a86ba1d2d8addb2af05b16
Parents: 16d4a15
Author: Kenneth Knowles 
Authored: Mon Jun 12 15:11:49 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../main/java/org/apache/beam/sdk/runners/TransformHierarchy.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/fc06b798/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 9c5f148..6f1ee94 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -406,7 +406,7 @@ public class TransformHierarchy {
   return fullName;
 }
 
-/** Returns the transform input, in unexpanded form. */
+/** Returns the transform input, in fully expanded form. */
 public Map, PValue> getInputs() {
   return inputs == null ? Collections., PValue>emptyMap() : 
inputs;
 }



[17/50] [abbrv] beam git commit: Move DirectRunner knob for suppressing runner-determined sharding out of core SDK

2017-07-12 Thread takidau
Move DirectRunner knob for suppressing runner-determined sharding out of core 
SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81a96ab0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81a96ab0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81a96ab0

Branch: refs/heads/DSL_SQL
Commit: 81a96ab029584e08495d461fc573b49de97d18c5
Parents: 92eec58
Author: Kenneth Knowles 
Authored: Fri Jul 7 08:49:08 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 runners/direct-java/pom.xml |  2 +-
 .../beam/runners/direct/DirectRegistrar.java|  2 +-
 .../beam/runners/direct/DirectRunner.java   |  5 +--
 .../beam/runners/direct/DirectTestOptions.java  | 42 
 .../runners/direct/DirectRegistrarTest.java |  2 +-
 .../beam/sdk/testing/TestPipelineOptions.java   | 10 -
 6 files changed, 47 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/81a96ab0/runners/direct-java/pom.xml
--
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 0e1f73a..e14e813 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -156,7 +156,7 @@
 
   [
 "--runner=DirectRunner",
-"--unitTest"
+"--runnerDeterminedSharding=false"
   ]
 
   

http://git-wip-us.apache.org/repos/asf/beam/blob/81a96ab0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
index 0e6fbab..53fb2f2 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -50,7 +50,7 @@ public class DirectRegistrar {
 @Override
 public Iterable> getPipelineOptions() {
   return ImmutableList.>of(
-  DirectOptions.class);
+  DirectOptions.class, DirectTestOptions.class);
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/81a96ab0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index a16e24d..7a221c4 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -43,7 +43,6 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
@@ -222,9 +221,9 @@ public class DirectRunner extends 
PipelineRunner {
   @SuppressWarnings("rawtypes")
   @VisibleForTesting
   List defaultTransformOverrides() {
-TestPipelineOptions testOptions = options.as(TestPipelineOptions.class);
+DirectTestOptions testOptions = options.as(DirectTestOptions.class);
 ImmutableList.Builder builder = 
ImmutableList.builder();
-if (!testOptions.isUnitTest()) {
+if (testOptions.isRunnerDeterminedSharding()) {
   builder.add(
   PTransformOverride.of(
   PTransformMatchers.writeWithRunnerDeterminedSharding(),

http://git-wip-us.apache.org/repos/asf/beam/blob/81a96ab0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java
new file mode 100644
index 000..a426443
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * reg

[29/50] [abbrv] beam git commit: BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs

2017-07-12 Thread takidau
BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e014db6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e014db6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e014db6b

Branch: refs/heads/DSL_SQL
Commit: e014db6b7af00b49467389854c63ef693819ec1f
Parents: eee0c9c
Author: Thomas Weise 
Authored: Sun Jul 9 11:57:43 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../operators/ApexParDoOperator.java| 21 +---
 .../runners/apex/examples/WordCountTest.java|  8 ++--
 2 files changed, 20 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 809ca2a..c3cbab2 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -359,10 +359,7 @@ public class ApexParDoOperator extends 
BaseOperator implements
   }
 }
 if (sideInputs.isEmpty()) {
-  if (traceTuples) {
-LOG.debug("\nemitting watermark {}\n", mark);
-  }
-  output.emit(mark);
+  outputWatermark(mark);
   return;
 }
 
@@ -370,10 +367,20 @@ public class ApexParDoOperator extends 
BaseOperator implements
 Math.min(pushedBackWatermark.get(), currentInputWatermark);
 if (potentialOutputWatermark > currentOutputWatermark) {
   currentOutputWatermark = potentialOutputWatermark;
-  if (traceTuples) {
-LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+  
outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+}
+  }
+
+  private void outputWatermark(ApexStreamTuple.WatermarkTuple mark) {
+if (traceTuples) {
+  LOG.debug("\nemitting {}\n", mark);
+}
+output.emit(mark);
+if (!additionalOutputPortMapping.isEmpty()) {
+  for (DefaultOutputPort> additionalOutput :
+  additionalOutputPortMapping.values()) {
+additionalOutput.emit(mark);
   }
-  output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
 }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
--
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e76096e..ba75746 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -123,11 +123,15 @@ public class WordCountTest {
 options.setInputFile(new File(inputFile).getAbsolutePath());
 String outputFilePrefix = "target/wordcountresult.txt";
 options.setOutput(outputFilePrefix);
-WordCountTest.main(TestPipeline.convertToArgs(options));
 
 File outFile1 = new File(outputFilePrefix + "-0-of-2");
 File outFile2 = new File(outputFilePrefix + "-1-of-2");
-Assert.assertTrue(outFile1.exists() && outFile2.exists());
+Assert.assertTrue(!outFile1.exists() || outFile1.delete());
+Assert.assertTrue(!outFile2.exists() || outFile2.delete());
+
+WordCountTest.main(TestPipeline.convertToArgs(options));
+
+Assert.assertTrue("result files exist", outFile1.exists() && 
outFile2.exists());
 HashSet results = new HashSet<>();
 results.addAll(FileUtils.readLines(outFile1));
 results.addAll(FileUtils.readLines(outFile2));



[01/50] [abbrv] beam git commit: Make modules that depend on Hadoop and Spark use the same version property

2017-07-12 Thread takidau
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL d89d1ee1a -> ec494f675


Make modules that depend on Hadoop and Spark use the same version property


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3294d4b7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3294d4b7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3294d4b7

Branch: refs/heads/DSL_SQL
Commit: 3294d4b7ccb49da1affc761eec010557267ee6ad
Parents: 4d30484
Author: Ismaël Mejía 
Authored: Sun Jun 4 22:55:05 2017 +0200
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:58 2017 -0700

--
 examples/java/pom.xml   | 18 --
 examples/java8/pom.xml  | 18 --
 pom.xml | 65 +++-
 runners/apex/pom.xml|  2 +-
 runners/spark/pom.xml   |  7 ---
 sdks/java/extensions/sorter/pom.xml |  6 --
 sdks/java/io/hadoop-file-system/pom.xml | 31 --
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml|  2 -
 sdks/java/io/hbase/pom.xml  |  9 ++-
 sdks/java/io/hcatalog/pom.xml   |  6 +-
 sdks/java/io/jdbc/pom.xml   |  2 -
 sdks/java/io/pom.xml| 31 --
 sdks/java/javadoc/pom.xml   |  2 -
 .../main/resources/archetype-resources/pom.xml  |  1 -
 .../main/resources/archetype-resources/pom.xml  |  1 -
 15 files changed, 98 insertions(+), 103 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/3294d4b7/examples/java/pom.xml
--
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 701e4fe..7ae4e6a 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -34,10 +34,6 @@
 
   jar
 
-  
-1.6.2
-  
-
   
 
 
+2.7.3
 1.3
 2.8.9
 3.0.1
@@ -139,7 +145,7 @@
 v1-rev10-1.22.0
 1.7.14
 0.20.0-beta
-1.6.2
+1.6.3
 4.3.5.RELEASE
 3.1.4
 v1-rev71-1.22.0
@@ -1076,6 +1082,42 @@
 ${snappy-java.version}
   
 
+  
+org.apache.hadoop
+hadoop-client
+${hadoop.version}
+  
+
+  
+org.apache.hadoop
+hadoop-common
+${hadoop.version}
+  
+
+  
+org.apache.hadoop
+hadoop-mapreduce-client-core
+${hadoop.version}
+  
+
+  
+org.apache.spark
+spark-core_2.10
+${spark.version}
+  
+
+  
+org.apache.spark
+spark-streaming_2.10
+${spark.version}
+  
+
+  
+org.apache.spark
+spark-network-common_2.10
+${spark.version}
+  
+
   
 
   
@@ -1145,6 +1187,27 @@
 test
   
 
+  
+org.apache.hadoop
+hadoop-minicluster
+${hadoop.version}
+test
+  
+
+  
+org.apache.hadoop
+hadoop-hdfs
+${hadoop.version}
+test
+  
+
+  
+org.apache.hadoop
+hadoop-hdfs
+${hadoop.version}
+tests
+test
+  
 
   
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3294d4b7/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 2c54654..88ff0f2 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -261,7 +261,7 @@
 
com.esotericsoftware.kryo:kryo::${apex.kryo.version}
 
com.datatorrent:netlet::1.3.0
 
org.slf4j:slf4j-api:jar:1.7.14
-
org.apache.hadoop:hadoop-common:jar:2.6.0
+
org.apache.hadoop:hadoop-common:jar:${hadoop.version}
 
joda-time:joda-time:jar:2.4
 
com.google.guava:guava:jar:20.0
   

http://git-wip-us.apache.org/repos/asf/beam/blob/3294d4b7/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ee72dd9..1d93427 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -34,8 +34,6 @@
   
 UTF-8
 UTF-8
-1.6.3
-2.2.0
 0.9.0.1
 2.4.4
 3.1.2
@@ -135,31 +133,26 @@
 
   org.apache.spark
   spark-core_2.10
-  ${spark.version}
   provided
 
 
   org.apache.spark
   spark-streaming_2.10
-  ${spark.version}
   provided
 
 
   org.apache.spark
   spark-network-common_2.10
-  ${spark.version}
   provided
 
 
   org.apache.hadoop
   hadoop-common
-  ${hadoop.version}
   provided
 
 
   org.apache.hadoop
   hadoop-mapreduce-client-core
-  ${hadoop.vers

[49/50] [abbrv] beam git commit: Reformatting Kinesis IO to comply with official code style

2017-07-12 Thread takidau
Reformatting Kinesis IO to comply with official code style


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7925a668
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7925a668
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7925a668

Branch: refs/heads/DSL_SQL
Commit: 7925a668b12e272c7b2631ff6b20376e92ad90be
Parents: 4abd714
Author: Pawel Kaczmarczyk 
Authored: Mon Jun 19 11:10:25 2017 +0200
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../sdk/io/kinesis/CheckpointGenerator.java |   6 +-
 .../beam/sdk/io/kinesis/CustomOptional.java | 111 ++--
 .../io/kinesis/DynamicCheckpointGenerator.java  |  52 +-
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  49 +-
 .../sdk/io/kinesis/KinesisClientProvider.java   |   4 +-
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 279 +-
 .../beam/sdk/io/kinesis/KinesisReader.java  | 206 +++
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  97 ++--
 .../beam/sdk/io/kinesis/KinesisRecord.java  | 177 +++---
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  68 +--
 .../beam/sdk/io/kinesis/KinesisSource.java  | 147 ++---
 .../beam/sdk/io/kinesis/RecordFilter.java   |  18 +-
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  37 +-
 .../beam/sdk/io/kinesis/ShardCheckpoint.java| 241 -
 .../sdk/io/kinesis/ShardRecordsIterator.java| 106 ++--
 .../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 
 .../beam/sdk/io/kinesis/StartingPoint.java  |  84 +--
 .../io/kinesis/StaticCheckpointGenerator.java   |  27 +-
 .../io/kinesis/TransientKinesisException.java   |   7 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 539 ++-
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  27 +-
 .../kinesis/DynamicCheckpointGeneratorTest.java |  33 +-
 .../sdk/io/kinesis/KinesisMockReadTest.java |  97 ++--
 .../io/kinesis/KinesisReaderCheckpointTest.java |  52 +-
 .../beam/sdk/io/kinesis/KinesisReaderIT.java| 127 ++---
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  | 166 +++---
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  34 +-
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |  43 +-
 .../beam/sdk/io/kinesis/KinesisUploader.java|  70 +--
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  52 +-
 .../beam/sdk/io/kinesis/RoundRobinTest.java |  42 +-
 .../sdk/io/kinesis/ShardCheckpointTest.java | 203 +++
 .../io/kinesis/ShardRecordsIteratorTest.java| 216 
 .../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++--
 34 files changed, 2031 insertions(+), 1952 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
--
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
index 919d85a..2629c57 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import java.io.Serializable;
 
 /**
@@ -25,6 +24,7 @@ import java.io.Serializable;
  * How exactly the checkpoint is generated is up to implementing class.
  */
 interface CheckpointGenerator extends Serializable {
-KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
-throws TransientKinesisException;
+
+  KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+  throws TransientKinesisException;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
--
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
index 4bed0e3..5a28214 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -24,76 +24,79 @@ import java.util.Objects;
  * Similar to Guava {@code Optional}, but throws {@link 
NoSuchElementException} for missing element.
  */
 abstract class CustomOptional {
-@SuppressWarnings("unchecked")
-public static  CustomOptional absent() {
-return (Absent) Absent.INSTANCE;
-}
 
-public static  CustomOptional of(T v) {
-return new Present<>(v);
-}
+  

[48/50] [abbrv] beam git commit: Reformatting Kinesis IO to comply with official code style

2017-07-12 Thread takidau
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
--
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 3e3984a..80c950f 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -31,9 +30,11 @@ import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.collect.Lists;
+
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Callable;
+
 import org.joda.time.Instant;
 
 /**
@@ -41,117 +42,121 @@ import org.joda.time.Instant;
  * proper error handling.
  */
 class SimplifiedKinesisClient {
-private final AmazonKinesis kinesis;
 
-public SimplifiedKinesisClient(AmazonKinesis kinesis) {
-this.kinesis = kinesis;
-}
+  private final AmazonKinesis kinesis;
 
-public static SimplifiedKinesisClient from(KinesisClientProvider provider) 
{
-return new SimplifiedKinesisClient(provider.get());
-}
+  public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+this.kinesis = kinesis;
+  }
 
-public String getShardIterator(final String streamName, final String 
shardId,
-   final ShardIteratorType shardIteratorType,
-   final String startingSequenceNumber, final 
Instant timestamp)
-throws TransientKinesisException {
-final Date date = timestamp != null ? timestamp.toDate() : null;
-return wrapExceptions(new Callable() {
-@Override
-public String call() throws Exception {
-return kinesis.getShardIterator(new GetShardIteratorRequest()
-.withStreamName(streamName)
-.withShardId(shardId)
-.withShardIteratorType(shardIteratorType)
-.withStartingSequenceNumber(startingSequenceNumber)
-.withTimestamp(date)
-).getShardIterator();
-}
-});
-}
+  public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+return new SimplifiedKinesisClient(provider.get());
+  }
 
-public List listShards(final String streamName) throws 
TransientKinesisException {
-return wrapExceptions(new Callable>() {
-@Override
-public List call() throws Exception {
-List shards = Lists.newArrayList();
-String lastShardId = null;
-
-StreamDescription description;
-do {
-description = kinesis.describeStream(streamName, 
lastShardId)
-.getStreamDescription();
-
-shards.addAll(description.getShards());
-lastShardId = shards.get(shards.size() - 1).getShardId();
-} while (description.getHasMoreShards());
-
-return shards;
-}
-});
-}
+  public String getShardIterator(final String streamName, final String shardId,
+  final ShardIteratorType shardIteratorType,
+  final String startingSequenceNumber, final Instant timestamp)
+  throws TransientKinesisException {
+final Date date = timestamp != null ? timestamp.toDate() : null;
+return wrapExceptions(new Callable() {
 
-/**
- * Gets records from Kinesis and deaggregates them if needed.
- *
- * @return list of deaggregated records
- * @throws TransientKinesisException - in case of recoverable situation
- */
-public GetKinesisRecordsResult getRecords(String shardIterator, String 
streamName,
-  String shardId) throws 
TransientKinesisException {
-return getRecords(shardIterator, streamName, shardId, null);
-}
+  @Override
+  public String call() throws Exception {
+return kinesis.getShardIterator(new GetShardIteratorRequest()
+.withStreamName(streamName)
+.withShardId(shardId)
+.withShardIteratorType(shardIteratorType)
+.withStartingSequenceNumber(startingSequenceNumber)
+.withTimestamp(date)
+).getShardIterator();
+  }
+});
+  }
 
-/**
- * Gets records fro

[26/50] [abbrv] beam git commit: Enable SplittableParDo on rehydrated ParDo transform

2017-07-12 Thread takidau
Enable SplittableParDo on rehydrated ParDo transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5ca058b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5ca058b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5ca058b

Branch: refs/heads/DSL_SQL
Commit: e5ca058bd7ad5f2150fef3e57649bcfb487a711f
Parents: bdece9d
Author: Kenneth Knowles 
Authored: Thu Jun 8 14:27:02 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../core/construction/SplittableParDo.java  | 25 ++
 .../direct/ParDoMultiOverrideFactory.java   | 36 ++--
 .../flink/FlinkStreamingPipelineTranslator.java |  2 +-
 3 files changed, 52 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index f31b495..e71187b 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -26,6 +27,7 @@ import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTrans
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -103,6 +105,9 @@ public class SplittableParDo
   public static  SplittableParDo 
forJavaParDo(
   ParDo.MultiOutput parDo) {
 checkArgument(parDo != null, "parDo must not be null");
+checkArgument(
+
DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
+"fn must be a splittable DoFn");
 return new SplittableParDo(
 parDo.getFn(),
 parDo.getMainOutputTag(),
@@ -110,6 +115,26 @@ public class SplittableParDo
 parDo.getAdditionalOutputTags());
   }
 
+  /**
+   * Creates the transform for a {@link ParDo}-compatible {@link 
AppliedPTransform}.
+   *
+   * The input may generally be a deserialized transform so it may not 
actually be a {@link
+   * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
+   */
+  public static SplittableParDo forAppliedParDo(AppliedPTransform parDo) {
+checkArgument(parDo != null, "parDo must not be null");
+
+try {
+  return new SplittableParDo<>(
+  ParDoTranslation.getDoFn(parDo),
+  (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
+  ParDoTranslation.getSideInputs(parDo),
+  ParDoTranslation.getAdditionalOutputTags(parDo));
+} catch (IOException exc) {
+  throw new RuntimeException(exc);
+}
+  }
+
   @Override
   public PCollectionTuple expand(PCollection input) {
 Coder restrictionCoder =

http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 2904bc1..8881967 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.constr

[14/50] [abbrv] beam git commit: Fix bad merge

2017-07-12 Thread takidau
Fix bad merge


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02774b98
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02774b98
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02774b98

Branch: refs/heads/DSL_SQL
Commit: 02774b98e6e9f708f5563d235e262c115e595066
Parents: 5e0f258
Author: Kenneth Knowles 
Authored: Thu Jul 6 21:45:39 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../java/org/apache/beam/runners/dataflow/TransformTranslator.java  | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/02774b98/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 7f61b6c..06ed1e0 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;



[27/50] [abbrv] beam git commit: Port DirectRunner ParDo overrides to SDK-agnostic APIs

2017-07-12 Thread takidau
Port DirectRunner ParDo overrides to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16d4a154
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16d4a154
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16d4a154

Branch: refs/heads/DSL_SQL
Commit: 16d4a154d8667dd1ebdf4993e816c680f4c982e6
Parents: e5ca058
Author: Kenneth Knowles 
Authored: Thu Jun 8 13:44:52 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../core/construction/ParDoTranslation.java | 16 ++---
 .../construction/RunnerPCollectionView.java | 16 +
 .../direct/ParDoMultiOverrideFactory.java   | 35 +---
 3 files changed, 43 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/16d4a154/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fe8c5aa..90c9aad 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
 
@@ -262,12 +263,19 @@ public class ParDoTranslation {
 ParDoPayload payload = 
parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
 
 List> views = new ArrayList<>();
-for (Map.Entry sideInput : 
payload.getSideInputsMap().entrySet()) {
+for (Map.Entry sideInputEntry : 
payload.getSideInputsMap().entrySet()) {
+  String sideInputTag = sideInputEntry.getKey();
+  RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+  PCollection originalPCollection =
+  checkNotNull(
+  (PCollection) application.getInputs().get(new 
TupleTag<>(sideInputTag)),
+  "no input with tag %s",
+  sideInputTag);
   views.add(
   viewFromProto(
-  application.getPipeline(),
-  sideInput.getValue(),
-  sideInput.getKey(),
+  sideInput,
+  sideInputTag,
+  originalPCollection,
   parDoProto,
   sdkComponents.toComponents()));
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16d4a154/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index b275188..85139e8 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import java.util.Map;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
@@ -94,4 +95,19 @@ class RunnerPCollectionView extends PValueBase implements 
PCollectionView
 throw new UnsupportedOperationException(String.format(
 "A %s cannot be expanded", 
RunnerPCollectionView.class.getSimpleName()));
   }
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof PCollectionView)) {
+  return false;
+}
+@SuppressWarnings("unchecked")
+PCollectionView otherView = (PCollectionView) other;
+return tag.equals(otherView.getTagInternal());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(tag);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16d4a154/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/

[11/50] [abbrv] beam git commit: Update SDK dependencies

2017-07-12 Thread takidau
Update SDK dependencies


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7cad601
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7cad601
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7cad601

Branch: refs/heads/DSL_SQL
Commit: a7cad6016ea1471afeeb64885a8d8bb60a8fcd59
Parents: 4bf1615
Author: Ahmet Altay 
Authored: Wed Jul 5 14:34:07 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 sdks/python/setup.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a7cad601/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 6646a58..8a0c9ae 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -120,9 +120,9 @@ GCP_REQUIREMENTS = [
   'google-apitools>=0.5.10,<=0.5.11',
   'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
   'googledatastore==7.0.1',
-  'google-cloud-pubsub==0.25.0',
+  'google-cloud-pubsub==0.26.0',
   # GCP packages required by tests
-  'google-cloud-bigquery>=0.23.0,<0.25.0',
+  'google-cloud-bigquery>=0.23.0,<0.26.0',
 ]
 
 



[22/50] [abbrv] beam git commit: [BEAM-1347] Add DoFnRunner specific to Fn Api.

2017-07-12 Thread takidau
[BEAM-1347] Add DoFnRunner specific to Fn Api.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/78a39bd5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/78a39bd5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/78a39bd5

Branch: refs/heads/DSL_SQL
Commit: 78a39bd54136ad29a0c8b7fab2dfe895c502e4f5
Parents: 513ccdf
Author: Luke Cwik 
Authored: Fri Jun 23 14:34:36 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 sdks/java/harness/pom.xml   |  10 +
 .../beam/runners/core/FnApiDoFnRunner.java  | 483 ---
 .../beam/runners/core/FnApiDoFnRunnerTest.java  |   7 +-
 3 files changed, 438 insertions(+), 62 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/78a39bd5/sdks/java/harness/pom.xml
--
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 9cfadc2..fe5c2f1 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -83,6 +83,11 @@
 
 
   org.apache.beam
+  beam-runners-core-construction-java
+
+
+
+  org.apache.beam
   beam-runners-google-cloud-dataflow-java
 
 
@@ -150,6 +155,11 @@
 
 
 
+  joda-time
+  joda-time
+
+
+
   org.slf4j
   slf4j-api
 

http://git-wip-us.apache.org/repos/asf/beam/blob/78a39bd5/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
--
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
index adf735a..b3cf3a7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
@@ -27,49 +27,59 @@ import com.google.common.collect.Multimap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fake.FakeStepContext;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
 
 /**
- * Classes associated with converting {@link RunnerApi.PTransform}s to {@link 
DoFnRunner}s.
- *
- * TODO: Move DoFnRunners into SDK harness and merge the methods below into 
it removing this
- * class.
+ * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to 
remove the layers
+ * of abstraction caused by StateInternals/TimerInternals since they model 
state and timer
+ * concepts differently.
  */
-public class FnApiDoFnRunner {
-
-  private static final String URN = "urn:org.apache.beam:dofn:java:0.1";
-
-  /** A registrar which provides a factory to handle Java {@link DoFn}s. */
+public class FnApiDoFnRunner implements DoFnRunner {
+  /**
+   * A registrar which provides a factory to handle Java {@link DoFn}s.
+   */
   @AutoService(PTransformRunnerFactory.Registrar.class)
   public static class Registrar implements
   PTransformRunnerFactory.Registrar {
 
 @Override
 public Map getPTransformRu

[24/50] [abbrv] beam git commit: Ignore processing time timers in expired windows

2017-07-12 Thread takidau
Ignore processing time timers in expired windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/951f3cab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/951f3cab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/951f3cab

Branch: refs/heads/DSL_SQL
Commit: 951f3cab3f6558524ee1146e0e3f347bcd02ecda
Parents: c167d10
Author: Kenneth Knowles 
Authored: Thu Jun 22 18:09:11 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../beam/runners/core/ReduceFnRunner.java   | 10 ++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 32 
 2 files changed, 42 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index ef33bef..0632c05 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -693,6 +693,11 @@ public class ReduceFnRunner {
   @SuppressWarnings("unchecked")
 WindowNamespace windowNamespace = (WindowNamespace) 
timer.getNamespace();
   W window = windowNamespace.getWindow();
+
+  if (TimeDomain.PROCESSING_TIME == timer.getDomain() && 
windowIsExpired(window)) {
+continue;
+  }
+
   ReduceFn.Context directContext =
   contextFactory.base(window, StateStyle.DIRECT);
   ReduceFn.Context renamedContext =
@@ -1090,4 +1095,9 @@ public class ReduceFnRunner {
 }
   }
 
+  private boolean windowIsExpired(BoundedWindow w) {
+return timerInternals
+.currentInputWatermarkTime()
+
.isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3a2c220..79ee91b 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -286,6 +286,38 @@ public class ReduceFnRunnerTest {
 
   /**
* Tests that when a processing time timer comes in after a window is expired
+   * it is just ignored.
+   */
+  @Test
+  public void testLateProcessingTimeTimer() throws Exception {
+WindowingStrategy strategy =
+WindowingStrategy.of((WindowFn) 
FixedWindows.of(Duration.millis(100)))
+.withTimestampCombiner(TimestampCombiner.EARLIEST)
+.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+.withAllowedLateness(Duration.ZERO)
+.withTrigger(
+Repeatedly.forever(
+
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10;
+
+ReduceFnTester tester =
+ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+tester.advanceProcessingTime(new Instant(5000));
+injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+injectElement(tester, 5);
+
+// After this advancement, the window is expired and only the GC process
+// should be allowed to touch it
+tester.advanceInputWatermarkNoTimers(new Instant(100));
+
+// This should not output
+tester.advanceProcessingTime(new Instant(6000));
+
+assertThat(tester.extractOutput(), emptyIterable());
+  }
+
+  /**
+   * Tests that when a processing time timer comes in after a window is expired
* but in the same bundle it does not cause a spurious output.
*/
   @Test



[37/50] [abbrv] beam git commit: Rehydrate PCollections

2017-07-12 Thread takidau
Rehydrate PCollections


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f17b8a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f17b8a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f17b8a2

Branch: refs/heads/DSL_SQL
Commit: 1f17b8a2bbd5068c8fd3374731d96f57d31433dc
Parents: 4c336e8
Author: Kenneth Knowles 
Authored: Thu Jul 6 09:24:22 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../construction/PCollectionTranslation.java| 16 ++
 .../PCollectionTranslationTest.java | 22 
 2 files changed, 38 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1f17b8a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 968966f..52526bb 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.values.PCollection;
@@ -47,6 +48,21 @@ public class PCollectionTranslation {
 .build();
   }
 
+  public static PCollection fromProto(
+  Pipeline pipeline, RunnerApi.PCollection pCollection, 
RunnerApi.Components components)
+  throws IOException {
+return PCollection.createPrimitiveOutputInternal(
+pipeline,
+WindowingStrategyTranslation.fromProto(
+
components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()),
+components),
+fromProto(pCollection.getIsBounded()))
+.setCoder(
+(Coder)
+CoderTranslation.fromProto(
+components.getCodersOrThrow(pCollection.getCoderId()), 
components));
+  }
+
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
 return fromProto(pCollection.getIsBounded());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1f17b8a2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
--
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
index 3b94220..5c45487 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
@@ -113,6 +113,28 @@ public class PCollectionTranslationTest {
 
   @Test
   public void testEncodeDecodeCycle() throws Exception {
+// Encode
+SdkComponents sdkComponents = SdkComponents.create();
+RunnerApi.PCollection protoCollection =
+PCollectionTranslation.toProto(testCollection, sdkComponents);
+RunnerApi.Components protoComponents = sdkComponents.toComponents();
+
+// Decode
+Pipeline pipeline = Pipeline.create();
+PCollection decodedCollection =
+PCollectionTranslation.fromProto(pipeline, protoCollection, 
protoComponents);
+
+// Verify
+assertThat(decodedCollection.getCoder(), 
Matchers.>equalTo(testCollection.getCoder()));
+assertThat(
+decodedCollection.getWindowingStrategy(),
+Matchers.>equalTo(
+testCollection.getWindowingStrategy().fixDefaults()));
+assertThat(decodedCollection.isBounded(), 
equalTo(testCollection.isBounded()));
+  }
+
+  @Test
+  public void testEncodeDecodeFields() throws Exception {
 SdkComponents sdkComponents = SdkComponents.create();
 RunnerApi.PCollection protoCollection = PCollectionTranslation
 .toProto(testCollection, sdkComponents);



[39/50] [abbrv] beam git commit: Fix null checks in TransformHierarchy

2017-07-12 Thread takidau
Fix null checks in TransformHierarchy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/83f31e94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/83f31e94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/83f31e94

Branch: refs/heads/DSL_SQL
Commit: 83f31e942b5e106b21bc922d016c5840bf0b0a3a
Parents: fc06b79
Author: Kenneth Knowles 
Authored: Mon Jun 12 15:12:18 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../org/apache/beam/sdk/runners/TransformHierarchy.java   | 10 +-
 1 file changed, 1 insertion(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/83f31e94/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 6f1ee94..d8ff59e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -145,14 +145,6 @@ public class TransformHierarchy {
   Node producerNode = getProducer(inputValue);
   PInput input = producerInput.remove(inputValue);
   inputValue.finishSpecifying(input, producerNode.getTransform());
-  checkState(
-  producers.get(inputValue) != null,
-  "Producer unknown for input %s",
-  inputValue);
-  checkState(
-  producers.get(inputValue) != null,
-  "Producer unknown for input %s",
-  inputValue);
 }
   }
 
@@ -201,7 +193,7 @@ public class TransformHierarchy {
   }
 
   Node getProducer(PValue produced) {
-return producers.get(produced);
+return checkNotNull(producers.get(produced), "No producer found for %s", 
produced);
   }
 
   public Set visit(PipelineVisitor visitor) {



[16/50] [abbrv] beam git commit: Simplified ByteBuddyOnTimerInvokerFactory

2017-07-12 Thread takidau
Simplified ByteBuddyOnTimerInvokerFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8512153b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8512153b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8512153b

Branch: refs/heads/DSL_SQL
Commit: 8512153b24bf13f5ba6e0298eeb8629ab2875da4
Parents: 02774b9
Author: Innocent Djiofack 
Authored: Wed Jun 28 22:15:11 2017 -0400
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../reflect/ByteBuddyOnTimerInvokerFactory.java | 73 
 .../reflect/OnTimerMethodSpecifier.java | 37 ++
 2 files changed, 65 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/8512153b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index e031337..5e31f2e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+
 import com.google.common.base.CharMatcher;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -61,13 +62,14 @@ class ByteBuddyOnTimerInvokerFactory implements 
OnTimerInvokerFactory {
 
 @SuppressWarnings("unchecked")
 Class> fnClass = (Class>) 
fn.getClass();
-
 try {
-  Constructor constructor = constructorCache.get(fnClass).get(timerId);
-  @SuppressWarnings("unchecked")
-  OnTimerInvoker invoker =
+OnTimerMethodSpecifier onTimerMethodSpecifier =
+OnTimerMethodSpecifier.forClassAndTimerId(fnClass, timerId);
+Constructor constructor = 
constructorCache.get(onTimerMethodSpecifier);
+
+OnTimerInvoker invoker =
   (OnTimerInvoker) constructor.newInstance(fn);
-  return invoker;
+return invoker;
 } catch (InstantiationException
 | IllegalAccessException
 | IllegalArgumentException
@@ -97,50 +99,31 @@ class ByteBuddyOnTimerInvokerFactory implements 
OnTimerInvokerFactory {
   private static final String FN_DELEGATE_FIELD_NAME = "delegate";
 
   /**
-   * A cache of constructors of generated {@link OnTimerInvoker} classes, 
keyed by {@link DoFn}
-   * class and then by {@link TimerId}.
+   * A cache of constructors of generated {@link OnTimerInvoker} classes,
+   * keyed by {@link OnTimerMethodSpecifier}.
*
* Needed because generating an invoker class is expensive, and to avoid 
generating an
* excessive number of classes consuming PermGen memory in Java's that still 
have PermGen.
*/
-  private final LoadingCache>, LoadingCache>>
-  constructorCache =
-  CacheBuilder.newBuilder()
-  .build(
-  new CacheLoader<
-  Class>, LoadingCache>>() {
-@Override
-public LoadingCache> load(
-final Class> fnClass) throws 
Exception {
-  return CacheBuilder.newBuilder().build(new 
OnTimerConstructorLoader(fnClass));
-}
-  });
-
-  /**
-   * A cache loader fixed to a particular {@link DoFn} class that loads 
constructors for the
-   * invokers for its {@link OnTimer @OnTimer} methods.
-   */
-  private static class OnTimerConstructorLoader extends CacheLoader> {
-
-private final DoFnSignature signature;
-
-public OnTimerConstructorLoader(Class> clazz) {
-  this.signature = DoFnSignatures.getSignature(clazz);
-}
-
-@Override
-public Constructor load(String timerId) throws Exception {
-  Class> invokerClass =
-  generateOnTimerInvokerClass(signature, timerId);
-  try {
-return invokerClass.getConstructor(signature.fnClass());
-  } catch (IllegalArgumentException | NoSuchMethodException | 
SecurityException e) {
-throw new RuntimeException(e);
-  }
-}
-  }
-
-  /**
+  private final LoadingCache> 
constructorCache =
+  CacheBuilder.newBuilder().build(
+  new CacheLoader>() {
+  @Override
+  public Constructor load(final OnTimerMethodSpecifier 
onTimerMethodSpecifier)
+  throws Exception {
+  DoFnSignature signature =
+  
DoFnSignatures.getSignature(onTimerMethodSpecifier.fnClass());
+  Class>

[20/50] [abbrv] beam git commit: Reject stateful ParDo if coder not KvCoder with deterministic key coder

2017-07-12 Thread takidau
Reject stateful ParDo if coder not KvCoder with deterministic key coder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8974672
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8974672
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8974672

Branch: refs/heads/DSL_SQL
Commit: f89746722419ef3c60f92d7a0fa17e4e6247b265
Parents: 81a96ab
Author: Kenneth Knowles 
Authored: Wed Jul 5 17:24:25 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  27 +
 .../apache/beam/sdk/transforms/ParDoTest.java   | 102 +++
 2 files changed, 129 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/f8974672/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index db1f791..0d03835 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -455,6 +456,27 @@ public class ParDo {
 }
   }
 
+  private static void validateStateApplicableForInput(
+  DoFn fn,
+  PCollection input) {
+Coder inputCoder = input.getCoder();
+checkArgument(
+inputCoder instanceof KvCoder,
+"%s requires its input to use %s in order to use state and timers.",
+ParDo.class.getSimpleName(),
+KvCoder.class.getSimpleName());
+
+KvCoder kvCoder = (KvCoder) inputCoder;
+try {
+kvCoder.getKeyCoder().verifyDeterministic();
+} catch (Coder.NonDeterministicException exc) {
+  throw new IllegalArgumentException(
+  String.format(
+  "%s requires a deterministic key coder in order to use state and 
timers",
+  ParDo.class.getSimpleName()));
+}
+  }
+
   /**
* Try to provide coders for as many of the type arguments of given
* {@link DoFnSignature.StateDeclaration} as possible.
@@ -737,6 +759,11 @@ public class ParDo {
   // Use coder registry to determine coders for all StateSpec defined in 
the fn signature.
   finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), 
input.getCoder());
 
+  DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+  if (signature.usesState() || signature.usesTimers()) {
+validateStateApplicableForInput(fn, input);
+  }
+
   PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
   input.getPipeline(),
   TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),

http://git-wip-us.apache.org/repos/asf/beam/blob/f8974672/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 5b60ef3..fa4949e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1593,6 +1593,108 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  public void testStateNotKeyed() {
+final String stateId = "foo";
+
+DoFn fn =
+new DoFn() {
+
+  @StateId(stateId)
+  private final StateSpec> intState =
+  StateSpecs.value();
+
+  @ProcessElement
+  public void processElement(
+  ProcessContext c, @StateId(stateId) ValueState state) {}
+};
+
+thrown.expect(IllegalArgumentException.class);
+thrown.expectMessage("state");
+thrown.expectMessage("KvCoder");
+
+pipeline.apply(Create.of("hello", "goodbye", "hello 
again")).apply(ParDo.of(fn));
+  }
+
+  @Test
+  public void testStateNotDeterministic() {
+final String stateId = "foo";
+
+// DoubleCoder is not deterministic, so this should crash
+DoFn, Integer> fn =
+new DoFn, Integer>() {
+
+  @StateId(stateId)
+  private final StateSpec> intState =
+  StateSpecs.value();
+
+  @

[46/50] [abbrv] beam git commit: Adds TextIO.readAll(), implemented rather naively

2017-07-12 Thread takidau
Adds TextIO.readAll(), implemented rather naively


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fcb06f3b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fcb06f3b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fcb06f3b

Branch: refs/heads/DSL_SQL
Commit: fcb06f3bf5482dc3ae63a3c070592bae0c631c6d
Parents: 2e42ae4
Author: Eugene Kirpichov 
Authored: Fri Jun 23 18:02:10 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 ...ndedSplittableProcessElementInvokerTest.java |   2 +-
 .../core/SplittableParDoProcessFnTest.java  |   2 +-
 .../DataflowPipelineTranslatorTest.java |   2 +-
 .../apache/beam/sdk/io/CompressedSource.java|  40 ++--
 .../apache/beam/sdk/io/OffsetBasedSource.java   |  22 +-
 .../java/org/apache/beam/sdk/io/TextIO.java | 230 +--
 .../apache/beam/sdk/io/range/OffsetRange.java   | 101 
 .../beam/sdk/io/range/OffsetRangeTracker.java   |   3 +
 .../transforms/splittabledofn/OffsetRange.java  |  77 ---
 .../splittabledofn/OffsetRangeTracker.java  |   1 +
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  62 +++--
 .../beam/sdk/transforms/SplittableDoFnTest.java |   2 +-
 .../splittabledofn/OffsetRangeTrackerTest.java  |   1 +
 13 files changed, 387 insertions(+), 158 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index a2f6acc..b80a632 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -25,10 +25,10 @@ import static org.junit.Assert.assertThat;
 
 import java.util.Collection;
 import java.util.concurrent.Executors;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 9543de8..1cd1275 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -39,11 +39,11 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 948af1c..43b2788 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/Dat

[19/50] [abbrv] beam git commit: [BEAM-1347] Rename DoFnRunnerFactory to FnApiDoFnRunner.

2017-07-12 Thread takidau
[BEAM-1347] Rename DoFnRunnerFactory to FnApiDoFnRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/513ccdf1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/513ccdf1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/513ccdf1

Branch: refs/heads/DSL_SQL
Commit: 513ccdf1bf7ed96ff43a7e1476e3202dd7eea93d
Parents: f897467
Author: Luke Cwik 
Authored: Fri Jun 23 14:31:58 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../beam/runners/core/DoFnRunnerFactory.java| 182 
 .../beam/runners/core/FnApiDoFnRunner.java  | 182 
 .../runners/core/DoFnRunnerFactoryTest.java | 209 ---
 .../beam/runners/core/FnApiDoFnRunnerTest.java  | 209 +++
 4 files changed, 391 insertions(+), 391 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/513ccdf1/sdks/java/harness/src/main/java/org/apache/beam/runners/core/DoFnRunnerFactory.java
--
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/DoFnRunnerFactory.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/DoFnRunnerFactory.java
deleted file mode 100644
index 3c0b6eb..000
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/DoFnRunnerFactory.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fake.FakeStepContext;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Classes associated with converting {@link RunnerApi.PTransform}s to {@link 
DoFnRunner}s.
- *
- * TODO: Move DoFnRunners into SDK harness and merge the methods below into 
it removing this
- * class.
- */
-public class DoFnRunnerFactory {
-
-  private static final String URN = "urn:org.apache.beam:dofn:java:0.1";
-
-  /** A registrar which provides a factory to handle Java {@link DoFn}s. */
-  @AutoService(PTransformRunnerFactory.Registrar.class)
-  public static class Registrar implements
-  PTransformRunnerFactory.Registrar {
-
-@Override
-public Map getPTransformRunnerFactories() 
{
-  return ImmutableMap.of(URN, new Factory());
-}
-  }
-
-  /** A factory for {@link DoFnRunner}s. */
-  static class Factory
-  implements PTransformRunnerFactory> {
-
-@Override
-public DoFnRunner createRunnerForPTransform(
-PipelineOptions pipelineOptions,
-BeamFnDataClient beamFnDataClient,
-String pTransformId,
-RunnerApi.PTransform pTransform,
-Supplier processBundleInstructionId,
-Map pCollections,
-Map coders,
-Multimap>> 
pCollectionIdsToConsumers,
-Consumer a

[43/50] [abbrv] beam git commit: Cleanup and fix ptransform_fn decorator.

2017-07-12 Thread takidau
Cleanup and fix ptransform_fn decorator.

Previously CallablePTransform was being used both as the
factory and the transform itself, which could result in state
getting carried between pipelines.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd2a8cca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd2a8cca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd2a8cca

Branch: refs/heads/DSL_SQL
Commit: bd2a8cca8c64eba5e362ffd78a868e3deb3755e4
Parents: fcb06f3
Author: Robert Bradshaw 
Authored: Tue Jul 11 14:32:47 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 sdks/python/apache_beam/transforms/combiners.py |  8 
 .../apache_beam/transforms/combiners_test.py|  7 +---
 .../python/apache_beam/transforms/ptransform.py | 41 +---
 3 files changed, 28 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/bd2a8cca/sdks/python/apache_beam/transforms/combiners.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index fa0742d..875306f 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -149,6 +149,7 @@ class Top(object):
   """Combiners for obtaining extremal elements."""
   # pylint: disable=no-self-argument
 
+  @staticmethod
   @ptransform.ptransform_fn
   def Of(pcoll, n, compare=None, *args, **kwargs):
 """Obtain a list of the compare-most N elements in a PCollection.
@@ -177,6 +178,7 @@ class Top(object):
 return pcoll | core.CombineGlobally(
 TopCombineFn(n, compare, key, reverse), *args, **kwargs)
 
+  @staticmethod
   @ptransform.ptransform_fn
   def PerKey(pcoll, n, compare=None, *args, **kwargs):
 """Identifies the compare-most N elements associated with each key.
@@ -210,21 +212,25 @@ class Top(object):
 return pcoll | core.CombinePerKey(
 TopCombineFn(n, compare, key, reverse), *args, **kwargs)
 
+  @staticmethod
   @ptransform.ptransform_fn
   def Largest(pcoll, n):
 """Obtain a list of the greatest N elements in a PCollection."""
 return pcoll | Top.Of(n)
 
+  @staticmethod
   @ptransform.ptransform_fn
   def Smallest(pcoll, n):
 """Obtain a list of the least N elements in a PCollection."""
 return pcoll | Top.Of(n, reverse=True)
 
+  @staticmethod
   @ptransform.ptransform_fn
   def LargestPerKey(pcoll, n):
 """Identifies the N greatest elements associated with each key."""
 return pcoll | Top.PerKey(n)
 
+  @staticmethod
   @ptransform.ptransform_fn
   def SmallestPerKey(pcoll, n, reverse=True):
 """Identifies the N least elements associated with each key."""
@@ -369,10 +375,12 @@ class Sample(object):
   """Combiners for sampling n elements without replacement."""
   # pylint: disable=no-self-argument
 
+  @staticmethod
   @ptransform.ptransform_fn
   def FixedSizeGlobally(pcoll, n):
 return pcoll | core.CombineGlobally(SampleCombineFn(n))
 
+  @staticmethod
   @ptransform.ptransform_fn
   def FixedSizePerKey(pcoll, n):
 return pcoll | core.CombinePerKey(SampleCombineFn(n))

http://git-wip-us.apache.org/repos/asf/beam/blob/bd2a8cca/sdks/python/apache_beam/transforms/combiners_test.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index c79fec8..cd2b595 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -156,14 +156,11 @@ class CombineTest(unittest.TestCase):
 
   def test_combine_sample_display_data(self):
 def individual_test_per_key_dd(sampleFn, args, kwargs):
-  trs = [beam.CombinePerKey(sampleFn(*args, **kwargs)),
- beam.CombineGlobally(sampleFn(*args, **kwargs))]
+  trs = [sampleFn(*args, **kwargs)]
   for transform in trs:
 dd = DisplayData.create_from(transform)
 expected_items = [
-DisplayDataItemMatcher('fn', sampleFn.fn.__name__),
-DisplayDataItemMatcher('combine_fn',
-   transform.fn.__class__)]
+DisplayDataItemMatcher('fn', transform._fn.__name__)]
 if args:
   expected_items.append(
   DisplayDataItemMatcher('args', str(args)))

http://git-wip-us.apache.org/repos/asf/beam/blob/bd2a8cca/sdks/python/apache_beam/transforms/ptransform.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 6041353..cd84122 100644
--- a/sdks/python/ap

[44/50] [abbrv] beam git commit: Use URNs, not Java classes, in immutability enforcements

2017-07-12 Thread takidau
Use URNs, not Java classes, in immutability enforcements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a61f154
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a61f154
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a61f154

Branch: refs/heads/DSL_SQL
Commit: 6a61f154bdbc02c4d84053e13d68158e065e8372
Parents: 8ae2a79
Author: Kenneth Knowles 
Authored: Mon Jul 10 15:25:11 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../beam/runners/direct/DirectRunner.java   | 21 
 .../direct/ExecutorServiceParallelExecutor.java | 16 ++-
 2 files changed, 14 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/6a61f154/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7a221c4..4621224 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -38,14 +38,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -72,16 +69,17 @@ public class DirectRunner extends 
PipelineRunner {
 IMMUTABILITY {
   @Override
   public boolean appliesTo(PCollection collection, DirectGraph graph) {
-return 
CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
+return CONTAINS_UDF.contains(
+
PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
   }
 };
 
 /**
  * The set of {@link PTransform PTransforms} that execute a UDF. Useful 
for some enforcements.
  */
-private static final Set> CONTAINS_UDF =
+private static final Set CONTAINS_UDF =
 ImmutableSet.of(
-Read.Bounded.class, Read.Unbounded.class, 
ParDo.SingleOutput.class, MultiOutput.class);
+PTransformTranslation.READ_TRANSFORM_URN, 
PTransformTranslation.PAR_DO_TRANSFORM_URN);
 
 public abstract boolean appliesTo(PCollection collection, DirectGraph 
graph);
 
@@ -110,22 +108,19 @@ public class DirectRunner extends 
PipelineRunner {
   return bundleFactory;
 }
 
-@SuppressWarnings("rawtypes")
-private static Map, 
Collection>
+private static Map>
 defaultModelEnforcements(Set enabledEnforcements) {
-  ImmutableMap.Builder, 
Collection>
-  enforcements = ImmutableMap.builder();
+  ImmutableMap.Builder> 
enforcements =
+  ImmutableMap.builder();
   ImmutableList.Builder enabledParDoEnforcements =
   ImmutableList.builder();
   if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) {
 enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
   }
   Collection parDoEnforcements = 
enabledParDoEnforcements.build();
-  enforcements.put(ParDo.SingleOutput.class, parDoEnforcements);
-  enforcements.put(MultiOutput.class, parDoEnforcements);
+  enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, 
parDoEnforcements);
   return enforcements.build();
 }
-
   }
 
   


http://git-wip-us.apache.org/repos/asf/beam/blob/6a61f154/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2f4d1f6..75e2562 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/ru

[50/50] [abbrv] beam git commit: [BEAM-2610] This closes #3553

2017-07-12 Thread takidau
[BEAM-2610] This closes #3553


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec494f67
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec494f67
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec494f67

Branch: refs/heads/DSL_SQL
Commit: ec494f675aa73fbdc7929f9592f33951941962b0
Parents: d89d1ee 4f7f169
Author: Tyler Akidau 
Authored: Wed Jul 12 20:02:22 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:02:22 2017 -0700

--
 .gitignore  |   2 +-
 .../jenkins/common_job_properties.groovy|   9 +-
 .../job_beam_PerformanceTests_Python.groovy |  58 ++
 ..._beam_PostCommit_Java_JDKVersionsTest.groovy |   2 +
 ..._PostCommit_Java_MavenInstall_Windows.groovy |   3 +-
 .../job_beam_PreCommit_Website_Merge.groovy |  59 ++
 examples/java/pom.xml   |  20 +-
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../beam/examples/WindowedWordCountIT.java  |   4 +-
 examples/java8/pom.xml  |  20 +-
 .../complete/game/utils/WriteToText.java|  43 +-
 .../examples/complete/game/LeaderBoardTest.java |   2 +
 examples/pom.xml|   2 +-
 pom.xml | 123 +++-
 runners/apex/pom.xml|  20 +-
 .../apache/beam/runners/apex/ApexRunner.java|  61 +-
 .../translation/ApexPipelineTranslator.java |  16 +-
 .../apex/translation/TranslationContext.java|   4 +-
 .../operators/ApexParDoOperator.java|  21 +-
 .../runners/apex/examples/WordCountTest.java|   8 +-
 .../utils/ApexStateInternalsTest.java   | 411 +++
 runners/core-construction-java/pom.xml  |   2 +-
 .../CreatePCollectionViewTranslation.java   |   4 +-
 .../construction/ElementAndRestriction.java |  42 --
 .../ElementAndRestrictionCoder.java |  88 ---
 .../construction/PCollectionTranslation.java|  16 +
 .../core/construction/PTransformMatchers.java   | 109 ++-
 .../construction/PTransformTranslation.java |   7 +-
 .../core/construction/ParDoTranslation.java |  82 ++-
 .../construction/RunnerPCollectionView.java |  31 +-
 .../core/construction/SplittableParDo.java  | 124 +++-
 .../construction/TestStreamTranslation.java |  49 +-
 .../core/construction/TransformInputs.java  |  50 ++
 .../WindowingStrategyTranslation.java   |  27 +-
 .../construction/WriteFilesTranslation.java |  67 +-
 .../ElementAndRestrictionCoderTest.java | 126 
 .../PCollectionTranslationTest.java |  22 +
 .../construction/PTransformMatchersTest.java|  54 +-
 .../core/construction/ParDoTranslationTest.java |  28 +-
 .../core/construction/SplittableParDoTest.java  |  18 +-
 .../core/construction/TransformInputsTest.java  | 166 +
 .../WindowingStrategyTranslationTest.java   |   3 +
 .../construction/WriteFilesTranslationTest.java |  62 +-
 runners/core-java/pom.xml   |   2 +-
 .../core/LateDataDroppingDoFnRunner.java|  33 +-
 ...eBoundedSplittableProcessElementInvoker.java |  40 +-
 .../beam/runners/core/ProcessFnRunner.java  |  16 +-
 .../beam/runners/core/ReduceFnRunner.java   | 135 ++--
 .../beam/runners/core/SimpleDoFnRunner.java |  20 +
 .../core/SplittableParDoViaKeyedWorkItems.java  |  58 +-
 .../core/SplittableProcessElementInvoker.java   |  25 +-
 .../beam/runners/core/SystemReduceFn.java   |   6 +
 .../core/triggers/AfterAllStateMachine.java |  25 +-
 .../AfterDelayFromFirstElementStateMachine.java |   6 +-
 .../core/triggers/AfterFirstStateMachine.java   |  20 +-
 .../core/triggers/AfterPaneStateMachine.java|   6 +-
 .../triggers/AfterWatermarkStateMachine.java|   7 +-
 .../triggers/ExecutableTriggerStateMachine.java |  23 +-
 .../core/triggers/NeverStateMachine.java|   5 +-
 .../core/triggers/TriggerStateMachine.java  |  27 -
 .../core/InMemoryStateInternalsTest.java| 569 ++-
 ...ndedSplittableProcessElementInvokerTest.java |  47 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 374 +-
 .../beam/runners/core/ReduceFnTester.java   |  48 +-
 .../core/SplittableParDoProcessFnTest.java  | 117 ++-
 .../beam/runners/core/StateInternalsTest.java   | 613 
 .../beam/runners/core/WindowMatchers.java   |  15 +
 .../triggers/AfterFirstStateMachineTest.java|   5 +-
 .../AfterWatermarkStateMachineTest.java |   7 +-
 .../core/triggers/StubTriggerStateMachine.java  |   7 +-
 runners/direct-java/pom.xml |   7 +-
 .../beam/runners/direct/CommittedResult.java|  12 +-
 .../apache/beam/runners/direct/DirectGraph.java |  38 +-
 .../beam/runners/direct/DirectGraphVisitor.java |  48 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  13 +-
 .../dire

[08/50] [abbrv] beam git commit: [maven-release-plugin] prepare for next development iteration

2017-07-12 Thread takidau
[maven-release-plugin] prepare for next development iteration


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd157519
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd157519
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd157519

Branch: refs/heads/DSL_SQL
Commit: cd1575191ba9fc05f485e3db985bebc2dd30b5b3
Parents: 89531a8
Author: Jean-Baptiste Onofré 
Authored: Wed Jul 5 16:47:38 2017 +0200
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 examples/java/pom.xml   | 2 +-
 examples/java8/pom.xml  | 2 +-
 examples/pom.xml| 2 +-
 pom.xml | 4 ++--
 runners/apex/pom.xml| 2 +-
 runners/core-construction-java/pom.xml  | 2 +-
 runners/core-java/pom.xml   | 2 +-
 runners/direct-java/pom.xml | 2 +-
 runners/flink/pom.xml   | 2 +-
 runners/google-cloud-dataflow-java/pom.xml  | 2 +-
 runners/pom.xml | 2 +-
 runners/spark/pom.xml   | 2 +-
 sdks/common/fn-api/pom.xml  | 2 +-
 sdks/common/pom.xml | 2 +-
 sdks/common/runner-api/pom.xml  | 2 +-
 sdks/java/build-tools/pom.xml   | 2 +-
 sdks/java/core/pom.xml  | 2 +-
 sdks/java/extensions/google-cloud-platform-core/pom.xml | 2 +-
 sdks/java/extensions/jackson/pom.xml| 2 +-
 sdks/java/extensions/join-library/pom.xml   | 2 +-
 sdks/java/extensions/pom.xml| 2 +-
 sdks/java/extensions/protobuf/pom.xml   | 2 +-
 sdks/java/extensions/sorter/pom.xml | 2 +-
 sdks/java/harness/pom.xml   | 2 +-
 sdks/java/io/amqp/pom.xml   | 2 +-
 sdks/java/io/cassandra/pom.xml  | 2 +-
 sdks/java/io/common/pom.xml | 2 +-
 sdks/java/io/elasticsearch/pom.xml  | 2 +-
 sdks/java/io/google-cloud-platform/pom.xml  | 2 +-
 sdks/java/io/hadoop-common/pom.xml  | 2 +-
 sdks/java/io/hadoop-file-system/pom.xml | 2 +-
 sdks/java/io/hadoop/input-format/pom.xml| 2 +-
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml| 2 +-
 sdks/java/io/hadoop/pom.xml | 2 +-
 sdks/java/io/hbase/pom.xml  | 2 +-
 sdks/java/io/hcatalog/pom.xml   | 2 +-
 sdks/java/io/jdbc/pom.xml   | 2 +-
 sdks/java/io/jms/pom.xml| 2 +-
 sdks/java/io/kafka/pom.xml  | 2 +-
 sdks/java/io/kinesis/pom.xml| 2 +-
 sdks/java/io/mongodb/pom.xml| 2 +-
 sdks/java/io/mqtt/pom.xml   | 2 +-
 sdks/java/io/pom.xml| 2 +-
 sdks/java/io/xml/pom.xml| 2 +-
 sdks/java/java8tests/pom.xml| 2 +-
 sdks/java/javadoc/pom.xml   | 2 +-
 sdks/java/maven-archetypes/examples-java8/pom.xml   | 2 +-
 sdks/java/maven-archetypes/examples/pom.xml | 2 +-
 sdks/java/maven-archetypes/pom.xml  | 2 +-
 sdks/java/maven-archetypes/starter/pom.xml  | 2 +-
 sdks/java/pom.xml   | 2 +-
 sdks/pom.xml| 2 +-
 sdks/python/pom.xml | 2 +-
 53 files changed, 54 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/cd157519/examples/java/pom.xml
--
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 7ae4e6a..ae64a79 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.beam
 beam-examples-parent
-2.1.0-SNAPSHOT
+2.2.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd157519/examples/java8/pom.xml
--
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index a0ce708..6fd29a4 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.beam
 beam-examples-parent
-2.1.0-SNAPSHOT
+2.2.0-SNAPSHOT
 ../pom.xml
   

[34/50] [abbrv] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-12 Thread takidau
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 511d697..b57b28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,27 +34,29 @@ import org.apache.beam.sdk.util.MimeTypes;
  * '\n'} represented in {@code UTF-8} format as the record separator. Each 
record (including the
  * last) is terminated.
  */
-class TextSink extends FileBasedSink {
+class TextSink extends FileBasedSink {
   @Nullable private final String header;
   @Nullable private final String footer;
 
   TextSink(
   ValueProvider baseOutputFilename,
-  FilenamePolicy filenamePolicy,
+  DynamicDestinations dynamicDestinations,
   @Nullable String header,
   @Nullable String footer,
   WritableByteChannelFactory writableByteChannelFactory) {
-super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
+super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory);
 this.header = header;
 this.footer = footer;
   }
+
   @Override
-  public WriteOperation createWriteOperation() {
-return new TextWriteOperation(this, header, footer);
+  public WriteOperation createWriteOperation() {
+return new TextWriteOperation<>(this, header, footer);
   }
 
   /** A {@link WriteOperation WriteOperation} for text files. */
-  private static class TextWriteOperation extends WriteOperation {
+  private static class TextWriteOperation
+  extends WriteOperation {
 @Nullable private final String header;
 @Nullable private final String footer;
 
@@ -65,20 +67,20 @@ class TextSink extends FileBasedSink {
 }
 
 @Override
-public Writer createWriter() throws Exception {
-  return new TextWriter(this, header, footer);
+public Writer createWriter() throws Exception {
+  return new TextWriter<>(this, header, footer);
 }
   }
 
   /** A {@link Writer Writer} for text files. */
-  private static class TextWriter extends Writer {
+  private static class TextWriter extends Writer {
 private static final String NEWLINE = "\n";
 @Nullable private final String header;
 @Nullable private final String footer;
 private OutputStreamWriter out;
 
 public TextWriter(
-WriteOperation writeOperation,
+WriteOperation writeOperation,
 @Nullable String header,
 @Nullable String footer) {
   super(writeOperation, MimeTypes.TEXT);

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index a220eab..7013044 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,9 +20,12 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -30,8 +33,11 @@ import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -47,6 +53,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -55,6 +62,7 @@ import 
org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.

[13/50] [abbrv] beam git commit: Fix javadoc generation for AmqpIO, CassandraIO and HCatalogIO

2017-07-12 Thread takidau
Fix javadoc generation for AmqpIO, CassandraIO and HCatalogIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/699d59a9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/699d59a9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/699d59a9

Branch: refs/heads/DSL_SQL
Commit: 699d59a96e8e11d8f617e76657e22d4afe2bfa12
Parents: 8512153
Author: Ismaël Mejía 
Authored: Thu Jul 6 17:37:33 2017 +0200
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 pom.xml   | 18 ++
 sdks/java/javadoc/pom.xml | 15 +++
 2 files changed, 33 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/699d59a9/pom.xml
--
diff --git a/pom.xml b/pom.xml
index fd01781..987760f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -429,6 +429,18 @@
 
   
 org.apache.beam
+beam-sdks-java-io-amqp
+${project.version}
+  
+
+  
+org.apache.beam
+beam-sdks-java-io-cassandra
+${project.version}
+  
+
+  
+org.apache.beam
 beam-sdks-java-io-elasticsearch
 ${project.version}
   
@@ -466,6 +478,12 @@
 
   
 org.apache.beam
+beam-sdks-java-io-hcatalog
+${project.version}
+  
+
+  
+org.apache.beam
 beam-sdks-java-io-jdbc
 ${project.version}
   

http://git-wip-us.apache.org/repos/asf/beam/blob/699d59a9/sdks/java/javadoc/pom.xml
--
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index ddb92cf..51109fb 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -99,6 +99,16 @@
 
 
   org.apache.beam
+  beam-sdks-java-io-amqp
+
+
+
+  org.apache.beam
+  beam-sdks-java-io-cassandra
+
+
+
+  org.apache.beam
   beam-sdks-java-io-elasticsearch
 
 
@@ -124,6 +134,11 @@
 
 
   org.apache.beam
+  beam-sdks-java-io-hcatalog
+
+
+
+  org.apache.beam
   beam-sdks-java-io-jdbc
 
 



[25/50] [abbrv] beam git commit: Include PCollection in rehydrated PCollectionView

2017-07-12 Thread takidau
Include PCollection in rehydrated PCollectionView


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bdece9d2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bdece9d2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bdece9d2

Branch: refs/heads/DSL_SQL
Commit: bdece9d2a57824865a35b4367619569e5800ed1b
Parents: 860e0a0
Author: Kenneth Knowles 
Authored: Thu Jul 6 09:24:55 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../core/construction/ParDoTranslation.java | 51 +---
 .../construction/RunnerPCollectionView.java |  7 +--
 .../core/construction/ParDoTranslationTest.java | 28 +++
 3 files changed, 67 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/bdece9d2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
--
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5f2bcae..fe8c5aa 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -74,6 +75,7 @@ import 
org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -262,8 +264,12 @@ public class ParDoTranslation {
 List> views = new ArrayList<>();
 for (Map.Entry sideInput : 
payload.getSideInputsMap().entrySet()) {
   views.add(
-  fromProto(
-  sideInput.getValue(), sideInput.getKey(), parDoProto, 
sdkComponents.toComponents()));
+  viewFromProto(
+  application.getPipeline(),
+  sideInput.getValue(),
+  sideInput.getKey(),
+  parDoProto,
+  sdkComponents.toComponents()));
 }
 return views;
   }
@@ -495,15 +501,47 @@ public class ParDoTranslation {
 return builder.build();
   }
 
-  public static PCollectionView fromProto(
-  SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, 
Components components)
+  public static PCollectionView viewFromProto(
+  Pipeline pipeline,
+  SideInput sideInput,
+  String localName,
+  RunnerApi.PTransform parDoTransform,
+  Components components)
   throws IOException {
-TupleTag tag = new TupleTag<>(id);
+
+String pCollectionId = parDoTransform.getInputsOrThrow(localName);
+
+// This may be a PCollection defined in another language, but we should be
+// able to rehydrate it enough to stick it in a side input. The coder may 
not
+// be grokkable in Java.
+PCollection pCollection =
+PCollectionTranslation.fromProto(
+pipeline, components.getPcollectionsOrThrow(pCollectionId), 
components);
+
+return viewFromProto(sideInput, localName, pCollection, parDoTransform, 
components);
+  }
+
+  /**
+   * Create a {@link PCollectionView} from a side input spec and an 
already-deserialized {@link
+   * PCollection} that should be wired up.
+   */
+  public static PCollectionView viewFromProto(
+  SideInput sideInput,
+  String localName,
+  PCollection pCollection,
+  RunnerApi.PTransform parDoTransform,
+  Components components)
+  throws IOException {
+checkArgument(
+localName != null,
+"%s.viewFromProto: localName must not be null",
+ParDoTranslation.class.getSimpleName());
+TupleTag tag = new TupleTag<>(localName);
 WindowMappingFn windowMappingFn = 
windowMappingFnFromProto(sideInput.getWindowMappingFn());
 ViewFn viewFn = viewFnFromProto(sideInput.getViewFn());
 
 RunnerApi.PCollection inputCollection =
-components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
+
components.getPcollectionsOrThrow(pa

[09/50] [abbrv] beam git commit: Fix PValue input in _PubSubReadEvaluator

2017-07-12 Thread takidau
Fix PValue input in _PubSubReadEvaluator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da3206c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da3206c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da3206c6

Branch: refs/heads/DSL_SQL
Commit: da3206c61d3e0c59ef8ac2cac85e2097f5db116a
Parents: d4fa33e
Author: Charles Chen 
Authored: Wed Jul 5 16:18:51 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 sdks/python/apache_beam/runners/direct/transform_evaluator.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/da3206c6/sdks/python/apache_beam/runners/direct/transform_evaluator.py
--
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 641291d..cb2ace2 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -436,8 +436,9 @@ class _PubSubReadEvaluator(_TransformEvaluator):
   bundles = [bundle]
 else:
   bundles = []
-input_pvalue = self._applied_ptransform.inputs
-if not input_pvalue:
+if self._applied_ptransform.inputs:
+  input_pvalue = self._applied_ptransform.inputs[0]
+else:
   input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
 unprocessed_bundle = self._evaluation_context.create_bundle(
 input_pvalue)



[06/50] [abbrv] beam git commit: Website Mergebot Job

2017-07-12 Thread takidau
Website Mergebot Job

Signed-off-by: Jason Kuster 


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f2c337cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f2c337cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f2c337cc

Branch: refs/heads/DSL_SQL
Commit: f2c337cc006101de050781a50ee70ad940dbf28e
Parents: a32db07
Author: Jason Kuster 
Authored: Fri Jun 9 01:39:15 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 .../jenkins/common_job_properties.groovy|  5 +-
 .../job_beam_PreCommit_Website_Merge.groovy | 59 
 2 files changed, 62 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/f2c337cc/.test-infra/jenkins/common_job_properties.groovy
--
diff --git a/.test-infra/jenkins/common_job_properties.groovy 
b/.test-infra/jenkins/common_job_properties.groovy
index 0e047ea..70534c6 100644
--- a/.test-infra/jenkins/common_job_properties.groovy
+++ b/.test-infra/jenkins/common_job_properties.groovy
@@ -23,11 +23,12 @@
 class common_job_properties {
 
   // Sets common top-level job properties for website repository jobs.
-  static void setTopLevelWebsiteJobProperties(context) {
+  static void setTopLevelWebsiteJobProperties(context,
+  String branch = 'asf-site') {
 setTopLevelJobProperties(
 context,
 'beam-site',
-'asf-site',
+branch,
 'beam',
 30)
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f2c337cc/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
--
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy 
b/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
new file mode 100644
index 000..0e2ae3f
--- /dev/null
+++ b/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import common_job_properties
+
+// Defines a job.
+job('beam_PreCommit_Website_Merge') {
+  description('Runs website tests for mergebot.')
+
+  // Set common parameters.
+  common_job_properties.setTopLevelWebsiteJobProperties(delegate, 'mergebot')
+
+  triggers {
+githubPush()
+  }
+
+  steps {
+// Run the following shell script as a build step.
+shell '''
+# Install RVM per instructions at https://rvm.io/rvm/install.
+RVM_GPG_KEY=409B6B1796C275462A1703113804BB82D39DC0E3
+gpg --keyserver hkp://keys.gnupg.net --recv-keys $RVM_GPG_KEY
+
+\\curl -sSL https://get.rvm.io | bash
+source /home/jenkins/.rvm/scripts/rvm
+
+# Install Ruby.
+RUBY_VERSION_NUM=2.3.0
+rvm install ruby $RUBY_VERSION_NUM --autolibs=read-only
+
+# Install Bundler gem
+PATH=~/.gem/ruby/$RUBY_VERSION_NUM/bin:$PATH
+GEM_PATH=~/.gem/ruby/$RUBY_VERSION_NUM/:$GEM_PATH
+gem install bundler --user-install
+
+# Install all needed gems.
+bundle install --path ~/.gem/
+
+# Build the new site and test it.
+rm -fr ./content/
+bundle exec rake test
+'''.stripIndent().trim()
+  }
+}



[23/50] [abbrv] beam git commit: Process timer firings for a window together

2017-07-12 Thread takidau
Process timer firings for a window together


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b4fa891
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b4fa891
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b4fa891

Branch: refs/heads/DSL_SQL
Commit: 7b4fa8913bdf85f85cbeb2c13b8779db921b2dec
Parents: 951f3ca
Author: Kenneth Knowles 
Authored: Thu Jun 22 18:43:39 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../examples/complete/game/LeaderBoardTest.java |  2 +
 .../beam/runners/core/ReduceFnRunner.java   | 98 +---
 .../beam/runners/core/ReduceFnRunnerTest.java   | 49 +-
 3 files changed, 115 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/7b4fa891/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
--
diff --git 
a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
 
b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 745c210..611e2b3 100644
--- 
a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ 
b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -276,6 +276,8 @@ public class LeaderBoardTest implements Serializable {
 .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
 event(TestUser.BLUE_TWO, 3, Duration.ZERO),
 event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+// Move the watermark to the end of the window to output on time
+.advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION))
 // Move the watermark past the end of the allowed lateness plus the 
end of the window
 .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
 .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))

http://git-wip-us.apache.org/repos/asf/beam/blob/7b4fa891/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 0632c05..634a2d1 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -29,7 +29,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -638,11 +637,9 @@ public class ReduceFnRunner {
   }
 
   /**
-   * Enriches TimerData with state necessary for processing a timer as well as
-   * common queries about a timer.
+   * A descriptor of the activation for a window based on a timer.
*/
-  private class EnrichedTimerData {
-public final Instant timestamp;
+  private class WindowActivation {
 public final ReduceFn.Context directContext;
 public final ReduceFn.Context renamedContext;
 // If this is an end-of-window timer then we may need to set a garbage 
collection timer
@@ -653,19 +650,34 @@ public class ReduceFnRunner {
 // end-of-window time to be a signal to garbage collect.
 public final boolean isGarbageCollection;
 
-EnrichedTimerData(
-TimerData timer,
+WindowActivation(
 ReduceFn.Context directContext,
 ReduceFn.Context renamedContext) {
-  this.timestamp = timer.getTimestamp();
   this.directContext = directContext;
   this.renamedContext = renamedContext;
   W window = directContext.window();
-  this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-  && timer.getTimestamp().equals(window.maxTimestamp());
-  Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
+
+  // The output watermark is before the end of the window if it is either 
unknown
+  // or it is known to be before it. If it is unknown, that means that 
there hasn't been
+  // enough data to advance it.
+  boolean outputWatermarkBeforeEOW =
+  timerInternals.currentOutputWatermarkTime() == null
+  || 
!timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp());
+
+  // The "end of the window" is reached when the local input watermark 
(for this key) surpasses
+  // it but the local output watermark (also for this key) has not. After 
data is emitted and
+  // the output watermark hold is rel

[47/50] [abbrv] beam git commit: Reformatting Kinesis IO to comply with official code style

2017-07-12 Thread takidau
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
--
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 49e806d..4b2190f 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -25,8 +25,10 @@ import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.when;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
 import java.io.IOException;
 import java.util.Collections;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,112 +42,114 @@ import org.mockito.stubbing.Answer;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardRecordsIteratorTest {
-private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
-private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
-private static final String SECOND_REFRESHED_ITERATOR = 
"SECOND_REFRESHED_ITERATOR";
-private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
-private static final String STREAM_NAME = "STREAM_NAME";
-private static final String SHARD_ID = "SHARD_ID";
-
-@Mock
-private SimplifiedKinesisClient kinesisClient;
-@Mock
-private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, 
cCheckpoint, dCheckpoint;
-@Mock
-private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
-@Mock
-private KinesisRecord a, b, c, d;
-@Mock
-private RecordFilter recordFilter;
-
-private ShardRecordsIterator iterator;
-
-@Before
-public void setUp() throws IOException, TransientKinesisException {
-
when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
-when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
-when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
-when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
-when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
-when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
-when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
-when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
-when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
-.thenReturn(firstResult);
-when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
-.thenReturn(secondResult);
-when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
-.thenReturn(thirdResult);
-
-when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
-when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-
-
when(firstResult.getRecords()).thenReturn(Collections.emptyList());
-
when(secondResult.getRecords()).thenReturn(Collections.emptyList());
-
when(thirdResult.getRecords()).thenReturn(Collections.emptyList());
-
-when(recordFilter.apply(anyListOf(KinesisRecord.class), 
any(ShardCheckpoint
-.class))).thenAnswer(new IdentityAnswer());
-
-iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, 
recordFilter);
-}
-
-@Test
-public void returnsAbsentIfNoRecordsPresent() throws IOException, 
TransientKinesisException {
-assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-}
-
-@Test
-public void goesThroughAvailableRecords() throws IOException, 
TransientKinesisException {
-when(firstResult.getRecords()).thenReturn(asList(a, b, c));
-when(secondResult.getRecords()).thenReturn(singletonList(d));
-
-assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
-assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
-assertThat(iter

[36/50] [abbrv] beam git commit: Adds DynamicDestinations support to FileBasedSink

2017-07-12 Thread takidau
Adds DynamicDestinations support to FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c336e84
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c336e84
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c336e84

Branch: refs/heads/DSL_SQL
Commit: 4c336e840e69e83e15d9ffb7e0a0178dd3ab8404
Parents: 1f6117f
Author: Reuven Lax 
Authored: Fri Jun 9 17:11:32 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../beam/examples/WindowedWordCountIT.java  |   4 +-
 .../complete/game/utils/WriteToText.java|  43 +-
 .../construction/WriteFilesTranslation.java |  67 +-
 .../construction/PTransformMatchersTest.java|  22 +-
 .../construction/WriteFilesTranslationTest.java |  62 +-
 .../direct/WriteWithShardingFactory.java|   6 +-
 .../direct/WriteWithShardingFactoryTest.java|  18 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  15 +-
 .../runners/dataflow/DataflowRunnerTest.java|  35 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  26 +-
 .../src/main/proto/beam_runner_api.proto|   7 +-
 .../apache/beam/sdk/coders/ShardedKeyCoder.java |  66 ++
 .../java/org/apache/beam/sdk/io/AvroIO.java | 220 ---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  32 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java  | 274 +---
 .../beam/sdk/io/DynamicFileDestinations.java| 115 
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 513 ---
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  44 +-
 .../java/org/apache/beam/sdk/io/TextIO.java | 488 ++
 .../java/org/apache/beam/sdk/io/TextSink.java   |  22 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 640 +++
 .../sdk/transforms/SerializableFunctions.java   |  50 ++
 .../org/apache/beam/sdk/values/ShardedKey.java  |  65 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  85 ++-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  | 135 ++--
 .../sdk/io/DrunkWritableByteChannelFactory.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  93 +--
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  56 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 264 +++-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 339 --
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java|   2 +
 .../io/gcp/bigquery/DynamicDestinations.java|  29 +-
 .../io/gcp/bigquery/GenerateShardedTable.java   |   1 +
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java|  67 --
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java|  74 ---
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   1 +
 .../io/gcp/bigquery/StreamingWriteTables.java   |   2 +
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |   1 +
 .../io/gcp/bigquery/WriteBundlesToFiles.java|   2 +
 .../bigquery/WriteGroupedRecordsToFiles.java|   1 +
 .../sdk/io/gcp/bigquery/WritePartition.java |   1 +
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   1 +
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java |   2 +
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java |  21 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |   4 +-
 47 files changed, 2710 insertions(+), 1363 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
--
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 5e6df9c..49865ba 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.examples.common;
 
-import static com.google.common.base.Verify.verifyNotNull;
+import static com.google.common.base.MoreObjects.firstNonNull;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -53,22 +54,12 @@ public class WriteOneFilePerWindow extends 
PTransform, PDone
 
   @Override
   public PDone expand(PCollection input) {
-// filenamePrefix may contain a directory and a filename component. Pull 
out only the filename
-// component from that path for the PerWindowFiles

[21/50] [abbrv] beam git commit: Set the type of batch jobs to FNAPI_BATCH when beam_fn_api experiment is specified.

2017-07-12 Thread takidau
Set the type of batch jobs to FNAPI_BATCH when beam_fn_api experiment is 
specified.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1313ffe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1313ffe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1313ffe

Branch: refs/heads/DSL_SQL
Commit: b1313ffef5bf8a2dd17ee20b6dd77f62d4174659
Parents: 78a39bd
Author: Valentyn Tymofieiev 
Authored: Fri Jul 7 15:14:56 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:00 2017 -0700

--
 .../runners/dataflow/dataflow_runner.py | 16 ++-
 .../runners/dataflow/internal/apiclient.py  | 29 ++--
 .../runners/dataflow/internal/apiclient_test.py |  5 +---
 3 files changed, 29 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b1313ffe/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
--
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 57bcc5e..059e139 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -46,8 +46,8 @@ from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
-from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.utils.plugin import BeamPlugin
 
@@ -65,12 +65,6 @@ class DataflowRunner(PipelineRunner):
   if blocking is set to False.
   """
 
-  # Environment version information. It is passed to the service during a
-  # a job submission and is used by the service to establish what features
-  # are expected by the workers.
-  BATCH_ENVIRONMENT_MAJOR_VERSION = '6'
-  STREAMING_ENVIRONMENT_MAJOR_VERSION = '1'
-
   # A list of PTransformOverride objects to be applied before running a 
pipeline
   # using DataflowRunner.
   # Currently this only works for overrides where the input and output types do
@@ -268,15 +262,9 @@ class DataflowRunner(PipelineRunner):
 if test_options.dry_run:
   return None
 
-standard_options = pipeline._options.view_as(StandardOptions)
-if standard_options.streaming:
-  job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
-else:
-  job_version = DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
-
 # Get a Dataflow API client and set its options
 self.dataflow_client = apiclient.DataflowApplicationClient(
-pipeline._options, job_version)
+pipeline._options)
 
 # Create the job
 result = DataflowPipelineResult(

http://git-wip-us.apache.org/repos/asf/beam/blob/b1313ffe/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index edac9d7..33dfe19 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -49,6 +49,13 @@ from apache_beam.options.pipeline_options import 
StandardOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 
 
+# Environment version information. It is passed to the service during a
+# a job submission and is used by the service to establish what features
+# are expected by the workers.
+_LEGACY_ENVIRONMENT_MAJOR_VERSION = '6'
+_FNAPI_ENVIRONMENT_MAJOR_VERSION = '1'
+
+
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
 
@@ -146,7 +153,10 @@ class Environment(object):
 if self.standard_options.streaming:
   job_type = 'FNAPI_STREAMING'
 else:
-  job_type = 'PYTHON_BATCH'
+  if _use_fnapi(options):
+job_type = 'FNAPI_BATCH'
+  else:
+job_type = 'PYTHON_BATCH'
 self.proto.version.additionalProperties.extend([
 dataflow.Environment.VersionValue.AdditionalProperty(
 key='job_type',
@@ -360,11 +370,16 @@ class Job(object):
 class DataflowApplicationClient(object):
   """A Dataflow API client used by application code to create and query 
jobs."""
 
-  def __init__(self, options, environment_version):
+  def __init__(self, options):
 """Initializes a Dataflow API client object."""
 self.standard_options = options.view_as(StandardOptions)
 self.google_cloud_options = options.view_

[05/50] [abbrv] beam git commit: [maven-release-plugin] prepare branch release-2.1.0

2017-07-12 Thread takidau
[maven-release-plugin] prepare branch release-2.1.0


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89531a89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89531a89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89531a89

Branch: refs/heads/DSL_SQL
Commit: 89531a89d4ad7d1516c7102b3fff14331b9276c1
Parents: 967c71a
Author: Jean-Baptiste Onofré 
Authored: Wed Jul 5 16:47:29 2017 +0200
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 pom.xml | 2 +-
 runners/direct-java/pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/89531a89/pom.xml
--
diff --git a/pom.xml b/pom.xml
index be3fe20..a5930ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
 
scm:git:https://git-wip-us.apache.org/repos/asf/beam.git
 
scm:git:https://git-wip-us.apache.org/repos/asf/beam.git
 https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary
-HEAD
+release-2.1.0
   
 
   

http://git-wip-us.apache.org/repos/asf/beam/blob/89531a89/runners/direct-java/pom.xml
--
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 6346575..5b5aec2 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -117,7 +117,7 @@
   
 
 
-  
+  
 
   
 



[32/50] [abbrv] beam git commit: [BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API concepts).

2017-07-12 Thread takidau
[BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API 
concepts).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f6117ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f6117ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f6117ff

Branch: refs/heads/DSL_SQL
Commit: 1f6117ffb23fc179a699cf11ebc2620af6cf2d4c
Parents: e014db6
Author: Luke Cwik 
Authored: Fri Jun 30 10:21:55 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:01 2017 -0700

--
 .../fn-api/src/main/proto/beam_fn_api.proto | 151 +--
 .../harness/control/ProcessBundleHandler.java   |   4 +-
 .../fn/harness/control/RegisterHandler.java |   2 +-
 .../fn/harness/control/RegisterHandlerTest.java |   8 +-
 .../apache_beam/runners/pipeline_context.py |   2 +-
 .../runners/portability/fn_api_runner.py|   2 +-
 .../apache_beam/runners/worker/sdk_worker.py|   4 +-
 .../runners/worker/sdk_worker_test.py   |  16 +-
 8 files changed, 25 insertions(+), 164 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1f6117ff/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
--
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 8162bc5..9da5afe 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -38,7 +38,6 @@ option java_package = "org.apache.beam.fn.v1";
 option java_outer_classname = "BeamFnApi";
 
 import "beam_runner_api.proto";
-import "google/protobuf/any.proto";
 import "google/protobuf/timestamp.proto";
 
 /*
@@ -67,129 +66,6 @@ message Target {
   string name = 2;
 }
 
-// (Deprecated) Information defining a PCollection
-//
-// Migrate to Runner API.
-message PCollection {
-  // (Required) A reference to a coder.
-  string coder_reference = 1 [deprecated = true];
-
-  // TODO: Windowing strategy, ...
-}
-
-// (Deprecated) A primitive transform within Apache Beam.
-//
-// Migrate to Runner API.
-message PrimitiveTransform {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1 [deprecated = true];
-
-  // (Required) A function spec that is used by this primitive
-  // transform to process data.
-  FunctionSpec function_spec = 2 [deprecated = true];
-
-  // A map of distinct input names to target definitions.
-  // For example, in CoGbk this represents the tag name associated with each
-  // distinct input name and a list of primitive transforms that are associated
-  // with the specified input.
-  map inputs = 3 [deprecated = true];
-
-  // A map from local output name to PCollection definitions. For example, in
-  // DoFn this represents the tag name associated with each distinct output.
-  map outputs = 4 [deprecated = true];
-
-  // TODO: Should we model side inputs as a special type of input for a
-  // primitive transform or should it be modeled as the relationship that
-  // the predecessor input will be a view primitive transform.
-  // A map of from side input names to side inputs.
-  map side_inputs = 5 [deprecated = true];
-
-  // The user name of this step.
-  // TODO: This should really be in display data and not at this level
-  string step_name = 6 [deprecated = true];
-}
-
-/*
- * User Definable Functions
- *
- * This is still unstable mainly due to how we model the side input.
- */
-
-// (Deprecated) Defines the common elements of user-definable functions,
-// to allow the SDK to express the information the runner needs to execute 
work.
-//
-// Migrate to Runner API.
-message FunctionSpec {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1 [deprecated = true];
-
-  // (Required) A globally unique name representing this user definable
-  // function.
-  //
-  // User definable functions use the urn encodings registered such that 
another
-  // may implement the user definable function within another language.
-  //
-  // For example:
-  //urn:org.apache.beam:coder:kv:1.0
-  string urn = 2 [deprecated = true];
-
-  // (Required) Reference to specification of execution environment required to
-  // invoke this function.
-  string environment_reference = 3 [deprecated = true];
-
-  // Data used to parameterize this function. Depending on the urn, this may be
-  // optional or required.
-  google.protobuf.Any data = 4 [deprecated = true];
-}
-
-// (Deprecated) Migrate to Runner API.
-message SideInput {
-  // TODO: Coder?
-
-  // For RunnerAPI.
-  Target input = 1 [deprecated = true];
-
-  // For FnAPI.
-  FunctionSpec view_fn = 2 [deprecated = true];
-}
-
-// (Deprecated) Defin

[03/50] [abbrv] beam git commit: [BEAM-2534] Handle offset gaps in Kafka messages.

2017-07-12 Thread takidau
[BEAM-2534] Handle offset gaps in Kafka messages.

KafkaIO logged a warning when there is a gap in offstes for messages.
Kafka also support 'KV' store style topics where some of the messages
are deleted leading gaps in offsets. This PR removes the log and
accounts for offset gaps in backlog estimate.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48627038
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48627038
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48627038

Branch: refs/heads/DSL_SQL
Commit: 48627038a331a4f142d260ebf347693941113b75
Parents: da3206c
Author: Raghu Angadi 
Authored: Wed Jun 28 12:07:06 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 
 1 file changed, 29 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/48627038/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 702bdd3..e520367 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -904,6 +904,22 @@ public class KafkaIO {
   return name;
 }
 
+// Maintains approximate average over last 1000 elements
+private static class MovingAvg {
+  private static final int MOVING_AVG_WINDOW = 1000;
+  private double avg = 0;
+  private long numUpdates = 0;
+
+  void update(double quantity) {
+numUpdates++;
+avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
+  }
+
+  double get() {
+return avg;
+  }
+}
+
 // maintains state of each assigned partition (buffered records, consumed 
offset, etc)
 private static class PartitionState {
   private final TopicPartition topicPartition;
@@ -911,9 +927,8 @@ public class KafkaIO {
   private long latestOffset;
   private Iterator> recordIter = 
Collections.emptyIterator();
 
-  // simple moving average for size of each record in bytes
-  private double avgRecordSize = 0;
-  private static final int movingAvgWindow = 1000; // very roughly avg of 
last 1000 elements
+  private MovingAvg avgRecordSize = new MovingAvg();
+  private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log 
compaction is enabled.
 
   PartitionState(TopicPartition partition, long nextOffset) {
 this.topicPartition = partition;
@@ -921,17 +936,13 @@ public class KafkaIO {
 this.latestOffset = UNINITIALIZED_OFFSET;
   }
 
-  // update consumedOffset and avgRecordSize
-  void recordConsumed(long offset, int size) {
+  // Update consumedOffset, avgRecordSize, and avgOffsetGap
+  void recordConsumed(long offset, int size, long offsetGap) {
 nextOffset = offset + 1;
 
-// this is always updated from single thread. probably not worth 
making it an AtomicDouble
-if (avgRecordSize <= 0) {
-  avgRecordSize = size;
-} else {
-  // initially, first record heavily contributes to average.
-  avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
-}
+// This is always updated from single thread. Probably not worth 
making atomic.
+avgRecordSize.update(size);
+avgOffsetGap.update(offsetGap);
   }
 
   synchronized void setLatestOffset(long latestOffset) {
@@ -944,14 +955,15 @@ public class KafkaIO {
 if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
   return UnboundedReader.BACKLOG_UNKNOWN;
 }
-return (long) (backlogMessageCount * avgRecordSize);
+return (long) (backlogMessageCount * avgRecordSize.get());
   }
 
   synchronized long backlogMessageCount() {
 if (latestOffset < 0 || nextOffset < 0) {
   return UnboundedReader.BACKLOG_UNKNOWN;
 }
-return Math.max(0, (latestOffset - nextOffset));
+double remaining = (latestOffset - nextOffset) / (1 + 
avgOffsetGap.get());
+return Math.max(0, (long) Math.ceil(remaining));
   }
 }
 
@@ -1154,14 +1166,11 @@ public class KafkaIO {
 continue;
   }
 
-  // sanity check
-  if (offset != expected) {
-LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
-this, pState.topicPartition, expected, offset - expected);
-  }
+  long offsetGap = offset - expected; // could be > 0 when Kafka log 
compactio

[04/50] [abbrv] beam git commit: [BEAM-2553] Update Maven exec plugin to 1.6.0 to incorporate messaging improvements

2017-07-12 Thread takidau
[BEAM-2553] Update Maven exec plugin to 1.6.0 to incorporate messaging 
improvements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c73e69af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c73e69af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c73e69af

Branch: refs/heads/DSL_SQL
Commit: c73e69af7fdaf4d74be990e56df0ef69b84ac7b5
Parents: f2c337c
Author: Luke Cwik 
Authored: Wed Jul 5 10:38:44 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:00:59 2017 -0700

--
 pom.xml| 2 +-
 .../starter/src/test/resources/projects/basic/reference/pom.xml| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c73e69af/pom.xml
--
diff --git a/pom.xml b/pom.xml
index f5d4815..fd01781 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@
 2.20
 3.6.1
 3.0.1
-1.4.0
+1.6.0
 3.0.2
 3.0.2
 3.0.0

http://git-wip-us.apache.org/repos/asf/beam/blob/c73e69af/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index 60405e6..6056fb0 100644
--- 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -28,7 +28,7 @@
 @project.version@
 
 3.6.1
-1.4.0
+1.6.0
 1.7.14
   
 



[42/50] [abbrv] beam git commit: Split bundle processor into separate class.

2017-07-12 Thread takidau
Split bundle processor into separate class.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4abd7141
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4abd7141
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4abd7141

Branch: refs/heads/DSL_SQL
Commit: 4abd7141673f4aead669efd4d2a87fc163764a2d
Parents: 6a61f15
Author: Robert Bradshaw 
Authored: Wed Jun 28 18:20:12 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 20:01:02 2017 -0700

--
 .../runners/portability/fn_api_runner.py|  20 +-
 .../runners/worker/bundle_processor.py  | 426 +++
 .../apache_beam/runners/worker/sdk_worker.py| 398 +
 3 files changed, 444 insertions(+), 400 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4abd7141/sdks/python/apache_beam/runners/portability/fn_api_runner.py
--
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index f522864..f88fe53 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -38,6 +38,7 @@ from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.portability import maptask_executor_runner
+from apache_beam.runners.worker import bundle_processor
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import sdk_worker
@@ -186,7 +187,7 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 target_name = only_element(get_inputs(operation).keys())
 runner_sinks[(transform_id, target_name)] = operation
 transform_spec = beam_runner_api_pb2.FunctionSpec(
-urn=sdk_worker.DATA_OUTPUT_URN,
+urn=bundle_processor.DATA_OUTPUT_URN,
 parameter=proto_utils.pack_Any(data_operation_spec))
 
   elif isinstance(operation, operation_specs.WorkerRead):
@@ -200,7 +201,7 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
   operation.source.source.read(None),
   operation.source.source.default_output_coder())
   transform_spec = beam_runner_api_pb2.FunctionSpec(
-  urn=sdk_worker.DATA_INPUT_URN,
+  urn=bundle_processor.DATA_INPUT_URN,
   parameter=proto_utils.pack_Any(data_operation_spec))
 
 else:
@@ -209,7 +210,7 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
   # The Dataflow runner harness strips the base64 encoding. do the same
   # here until we get the same thing back that we sent in.
   transform_spec = beam_runner_api_pb2.FunctionSpec(
-  urn=sdk_worker.PYTHON_SOURCE_URN,
+  urn=bundle_processor.PYTHON_SOURCE_URN,
   parameter=proto_utils.pack_Any(
   wrappers_pb2.BytesValue(
   value=base64.b64decode(
@@ -223,21 +224,22 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
   element_coder = si.source.default_output_coder()
   # TODO(robertwb): Actually flesh out the ViewFn API.
   side_input_extras.append((si.tag, element_coder))
-  side_input_data[sdk_worker.side_input_tag(transform_id, si.tag)] = (
-  self._reencode_elements(
-  si.source.read(si.source.get_range_tracker(None, None)),
-  element_coder))
+  side_input_data[
+  bundle_processor.side_input_tag(transform_id, si.tag)] = (
+  self._reencode_elements(
+  si.source.read(si.source.get_range_tracker(None, None)),
+  element_coder))
 augmented_serialized_fn = pickler.dumps(
 (operation.serialized_fn, side_input_extras))
 transform_spec = beam_runner_api_pb2.FunctionSpec(
-urn=sdk_worker.PYTHON_DOFN_URN,
+urn=bundle_processor.PYTHON_DOFN_URN,
 parameter=proto_utils.pack_Any(
 wrappers_pb2.BytesValue(value=augmented_serialized_fn)))
 
   elif isinstance(operation, operation_specs.WorkerFlatten):
 # Flatten is nice and simple.
 transform_spec = beam_runner_api_pb2.FunctionSpec(
-urn=sdk_worker.IDENTITY_DOFN_URN)
+urn=bundle_processor.IDENTITY_DOFN_URN)
 
   else:
 raise NotImplementedError(operation)

http://git-wip-us.apache.org/repos/asf/beam/blob/4abd7141/sdks/python/apache_beam/runners/worker/bundle_processor.py
-

Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Dataflow #3569

2017-07-12 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2572) Implement an S3 filesystem for Python SDK

2017-07-12 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085074#comment-16085074
 ] 

Chamikara Jayalath commented on BEAM-2572:
--

Hi Dmitry,

I think it might be better reduce the scope to provide credentials for 
accessing remote file-systems to FileSystem sub-classes. Any transform that 
needs credentials to access a third party service outside of this could 
possibly employ a similar technique but I don't think it makes sense to enforce 
the same credential access mechanism for all transforms.

We should possibly try to prevent passing credentials (or any other state) 
required by FileSystem interfaces through IO trasnforms (such as 
ReadFromText/WriteToText). FileSystem abstraction is a tool used by some of the 
transforms. Providing state needed by FileSystem objects through transform 
interfaces could result in an explosion of the number of parameters that we'll 
have to provide as you mentioned in one of your comments (also note that Python 
SDK uses keyword arguments as opposed to builder pattern used by Java SDK).

I think, the issue here is FileSystem objects being instantiated by the 
SDK/runner in the background instead of being directly instantiated by pipeline 
authors when defining a pipeline. Abstractions such as transforms, sources, 
sinks, DoFns get directly instantiated by pipeline authors so these 
abstractions do not have the problem of having to acquire state through a 
secondary mechanism. The solution that makes most sense to me is to get any 
required state (e.g. credentials) from environment as mentioned in some of the 
comments. How this state get set in the environment is runner specific. For 
DataflowRunner it could be through a separate package that gets installed in 
workers while for DirectRunner environment could be directly setup by users 
when defining the pipeline.

> Implement an S3 filesystem for Python SDK
> -
>
> Key: BEAM-2572
> URL: https://issues.apache.org/jira/browse/BEAM-2572
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py
>Reporter: Dmitry Demeshchuk
>Assignee: Ahmet Altay
>Priority: Minor
>
> There are two paths worth exploring, to my understanding:
> 1. Sticking to the HDFS-based approach (like it's done in Java).
> 2. Using boto/boto3 for accessing S3 through its common API endpoints.
> I personally prefer the second approach, for a few reasons:
> 1. In real life, HDFS and S3 have different consistency guarantees, therefore 
> their behaviors may contradict each other in some edge cases (say, we write 
> something to S3, but it's not immediately accessible for reading from another 
> end).
> 2. There are other AWS-based sources and sinks we may want to create in the 
> future: DynamoDB, Kinesis, SQS, etc.
> 3. boto3 already provides somewhat good logic for basic things like 
> reattempting.
> Whatever path we choose, there's another problem related to this: we 
> currently cannot pass any global settings (say, pipeline options, or just an 
> arbitrary kwarg) to a filesystem. Because of that, we'd have to setup the 
> runner nodes to have AWS keys set up in the environment, which is not trivial 
> to achieve and doesn't look too clean either (I'd rather see one single place 
> for configuring the runner options).
> Also, it's worth mentioning that I already have a janky S3 filesystem 
> implementation that only supports DirectRunner at the moment (because of the 
> previous paragraph). I'm perfectly fine finishing it myself, with some 
> guidance from the maintainers.
> Where should I move on from here, and whose input should I be looking for?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2564) Add integration test for string operators

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085062#comment-16085062
 ] 

ASF GitHub Bot commented on BEAM-2564:
--

Github user xumingming closed the pull request at:

https://github.com/apache/beam/pull/3532


> Add integration test for string operators
> -
>
> Key: BEAM-2564
> URL: https://issues.apache.org/jira/browse/BEAM-2564
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-sql
>Reporter: James Xu
>Assignee: James Xu
>  Labels: dsl_sql_merge
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2603) Add Meter in beam metrics

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085056#comment-16085056
 ] 

ASF GitHub Bot commented on BEAM-2603:
--

GitHub user unsleepy22 opened a pull request:

https://github.com/apache/beam/pull/3555

[BEAM-2603] add meter interface and implementation.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/beam beam-metrics-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3555.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3555


commit b118074ec713a5287da70cffba413ea6246b740e
Author: Cody 
Date:   2017-07-13T02:19:51Z

Fix BEAM-2603, add meter interface and implementation.




> Add Meter in beam metrics
> -
>
> Key: BEAM-2603
> URL: https://issues.apache.org/jira/browse/BEAM-2603
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core, sdk-java-core
>Reporter: Cody
>Assignee: Kenneth Knowles
> Fix For: 2.2.0
>
>
> 1. Add Meter interface and implementation
> 2. Add MeterData, MeterResult. Include MeterData in metric updates, and 
> MeterResult in metric query results.
> 3. Add corresponding changes regarding MeterResult and MeterData.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3532: [BEAM-2564] add integration test for string functio...

2017-07-12 Thread xumingming
Github user xumingming closed the pull request at:

https://github.com/apache/beam/pull/3532


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #3555: [BEAM-2603] add meter interface and implementation.

2017-07-12 Thread unsleepy22
GitHub user unsleepy22 opened a pull request:

https://github.com/apache/beam/pull/3555

[BEAM-2603] add meter interface and implementation.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/beam beam-metrics-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3555.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3555


commit b118074ec713a5287da70cffba413ea6246b740e
Author: Cody 
Date:   2017-07-13T02:19:51Z

Fix BEAM-2603, add meter interface and implementation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2610) upgrade to version 2.2.0

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085050#comment-16085050
 ] 

ASF GitHub Bot commented on BEAM-2610:
--

GitHub user XuMingmin opened a pull request:

https://github.com/apache/beam/pull/3554

[BEAM-2610] upgrade to version 2.2.0

placeholder for 2nd PR, may need update after #3553


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/XuMingmin/beam BEAM-2610

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3554


commit 26b2b7f047e85d0ae8a9b6e1f921164a39b871b8
Author: mingmxu 
Date:   2017-07-13T02:04:08Z

upgrade pom to 2.2.0-SNAPSHOT




> upgrade to version 2.2.0
> 
>
> Key: BEAM-2610
> URL: https://issues.apache.org/jira/browse/BEAM-2610
> Project: Beam
>  Issue Type: Task
>  Components: dsl-sql
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>  Labels: dsl_sql_merge
>
> This task syncs changes from master branch which is now using version 
> 2.2.0-SNAPSHOT. 
> As usual, there will be two PRs,
> 1. a pull request from master to DSL_SQL, this one is merged by ignoring any 
> errors;
> 2. a second PR to finish the change in DSL_SQL, and also fix any potential 
> issue;



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3554: [BEAM-2610] upgrade to version 2.2.0

2017-07-12 Thread XuMingmin
GitHub user XuMingmin opened a pull request:

https://github.com/apache/beam/pull/3554

[BEAM-2610] upgrade to version 2.2.0

placeholder for 2nd PR, may need update after #3553


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/XuMingmin/beam BEAM-2610

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3554


commit 26b2b7f047e85d0ae8a9b6e1f921164a39b871b8
Author: mingmxu 
Date:   2017-07-13T02:04:08Z

upgrade pom to 2.2.0-SNAPSHOT




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-2572) Implement an S3 filesystem for Python SDK

2017-07-12 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084956#comment-16084956
 ] 

Ahmet Altay commented on BEAM-2572:
---

I do not think ReadFromX, WritoToX transforms are the right approach to this 
problem. Ability to understand and access to the resources is a filesystem 
problem. (Also ReadFromX/WriteToX transforms with explicit credentials is no 
better than using pipeline options in term of the security of the credentials.).

My suggested approach is:
- Give file systems access to the pipeline options. 
- File systems should be capable of acquiring credentials only by using the 
environment and pipeline options. (This could range from explicitly getting 
credentials in the options to, using a KMS)

> Implement an S3 filesystem for Python SDK
> -
>
> Key: BEAM-2572
> URL: https://issues.apache.org/jira/browse/BEAM-2572
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py
>Reporter: Dmitry Demeshchuk
>Assignee: Ahmet Altay
>Priority: Minor
>
> There are two paths worth exploring, to my understanding:
> 1. Sticking to the HDFS-based approach (like it's done in Java).
> 2. Using boto/boto3 for accessing S3 through its common API endpoints.
> I personally prefer the second approach, for a few reasons:
> 1. In real life, HDFS and S3 have different consistency guarantees, therefore 
> their behaviors may contradict each other in some edge cases (say, we write 
> something to S3, but it's not immediately accessible for reading from another 
> end).
> 2. There are other AWS-based sources and sinks we may want to create in the 
> future: DynamoDB, Kinesis, SQS, etc.
> 3. boto3 already provides somewhat good logic for basic things like 
> reattempting.
> Whatever path we choose, there's another problem related to this: we 
> currently cannot pass any global settings (say, pipeline options, or just an 
> arbitrary kwarg) to a filesystem. Because of that, we'd have to setup the 
> runner nodes to have AWS keys set up in the environment, which is not trivial 
> to achieve and doesn't look too clean either (I'd rather see one single place 
> for configuring the runner options).
> Also, it's worth mentioning that I already have a janky S3 filesystem 
> implementation that only supports DirectRunner at the moment (because of the 
> previous paragraph). I'm perfectly fine finishing it myself, with some 
> guidance from the maintainers.
> Where should I move on from here, and whose input should I be looking for?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build became unstable: beam_PostCommit_Java_ValidatesRunner_Dataflow #3568

2017-07-12 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_MavenInstall #4361

2017-07-12 Thread Apache Jenkins Server
See 




Jenkins build became unstable: beam_PostCommit_Java_MavenInstall #4360

2017-07-12 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2572) Implement an S3 filesystem for Python SDK

2017-07-12 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084981#comment-16084981
 ] 

Dmitry Demeshchuk commented on BEAM-2572:
-

Also, just a random thought. How does this look to you?

{code}
(p
| "Read" >> ReadFromText('s3://some/path')
.with_credentials(AWSCredentials(aws_access_key_id=..., 
aws_secreta_access_key=...))
.with_credentials(GCPCredentials(token=...))
)
{code}

> Implement an S3 filesystem for Python SDK
> -
>
> Key: BEAM-2572
> URL: https://issues.apache.org/jira/browse/BEAM-2572
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py
>Reporter: Dmitry Demeshchuk
>Assignee: Ahmet Altay
>Priority: Minor
>
> There are two paths worth exploring, to my understanding:
> 1. Sticking to the HDFS-based approach (like it's done in Java).
> 2. Using boto/boto3 for accessing S3 through its common API endpoints.
> I personally prefer the second approach, for a few reasons:
> 1. In real life, HDFS and S3 have different consistency guarantees, therefore 
> their behaviors may contradict each other in some edge cases (say, we write 
> something to S3, but it's not immediately accessible for reading from another 
> end).
> 2. There are other AWS-based sources and sinks we may want to create in the 
> future: DynamoDB, Kinesis, SQS, etc.
> 3. boto3 already provides somewhat good logic for basic things like 
> reattempting.
> Whatever path we choose, there's another problem related to this: we 
> currently cannot pass any global settings (say, pipeline options, or just an 
> arbitrary kwarg) to a filesystem. Because of that, we'd have to setup the 
> runner nodes to have AWS keys set up in the environment, which is not trivial 
> to achieve and doesn't look too clean either (I'd rather see one single place 
> for configuring the runner options).
> Also, it's worth mentioning that I already have a janky S3 filesystem 
> implementation that only supports DirectRunner at the moment (because of the 
> previous paragraph). I'm perfectly fine finishing it myself, with some 
> guidance from the maintainers.
> Where should I move on from here, and whose input should I be looking for?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2572) Implement an S3 filesystem for Python SDK

2017-07-12 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084978#comment-16084978
 ] 

Dmitry Demeshchuk commented on BEAM-2572:
-

I agree that resource access is a filesystem problem, indeed.

Let's discuss each approach, then:

> Give file systems access to the pipeline options.

1. Do we somehow allow using separate credentials for reading and writing? Do 
we allow using separate credentials for different paths (say, different S3 
buckets)?
2. Let's say, for sake of simplicity, that we are using a single set of 
credentials in the pipeline options for the S3 filesystem. How do we provide 
AWS credentials to other, non-filesystem, sources and sinks (Dynamo, Redshift, 
Kinesis, SQS, etc)? Do we use pipeline options too, for sake of consistency? Or 
do we provide AWS credentials as parameters to each of those PTransforms?

> File systems should be capable of acquiring credentials only by using the 
> environment and pipeline options. (This could range from explicitly getting 
> credentials in the options to, using a KMS)

3. How do we provision environment to the runner nodes? Sounds like this will 
have a be a runner-specific feature. Besides, some cloud provides (say, GCP) 
would rather require us to provide credentials in a file, not in environment.
4. If we use KMS or such, should we introduce a notion of secrets storage to 
Beam? While I think that's an option, it doesn't seem to me drastically 
different from just using pipeline options with a few extra security measures. 
Besides, it will likely increase the infrastructural complexity of setup.

(Please tell me anytime if we should move this discussion to dev@)

> Implement an S3 filesystem for Python SDK
> -
>
> Key: BEAM-2572
> URL: https://issues.apache.org/jira/browse/BEAM-2572
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py
>Reporter: Dmitry Demeshchuk
>Assignee: Ahmet Altay
>Priority: Minor
>
> There are two paths worth exploring, to my understanding:
> 1. Sticking to the HDFS-based approach (like it's done in Java).
> 2. Using boto/boto3 for accessing S3 through its common API endpoints.
> I personally prefer the second approach, for a few reasons:
> 1. In real life, HDFS and S3 have different consistency guarantees, therefore 
> their behaviors may contradict each other in some edge cases (say, we write 
> something to S3, but it's not immediately accessible for reading from another 
> end).
> 2. There are other AWS-based sources and sinks we may want to create in the 
> future: DynamoDB, Kinesis, SQS, etc.
> 3. boto3 already provides somewhat good logic for basic things like 
> reattempting.
> Whatever path we choose, there's another problem related to this: we 
> currently cannot pass any global settings (say, pipeline options, or just an 
> arbitrary kwarg) to a filesystem. Because of that, we'd have to setup the 
> runner nodes to have AWS keys set up in the environment, which is not trivial 
> to achieve and doesn't look too clean either (I'd rather see one single place 
> for configuring the runner options).
> Also, it's worth mentioning that I already have a janky S3 filesystem 
> implementation that only supports DirectRunner at the moment (because of the 
> previous paragraph). I'm perfectly fine finishing it myself, with some 
> guidance from the maintainers.
> Where should I move on from here, and whose input should I be looking for?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2618

2017-07-12 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3387

2017-07-12 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2610) upgrade to version 2.2.0

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084909#comment-16084909
 ] 

ASF GitHub Bot commented on BEAM-2610:
--

GitHub user XuMingmin opened a pull request:

https://github.com/apache/beam/pull/3553

[BEAM-2610] upgrade to version 2.2.0

As described in task 
[BEAM-2610](https://issues.apache.org/jira/browse/BEAM-2610), this is the first 
PR to do the job. Feel free to merge it if no mis-operation during creating the 
PR as any issues are supposed to be fixed in the second PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/beam master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3553


commit 87be64e9817da5e5c86a243471021268d6281b33
Author: Jean-Baptiste Onofré 
Date:   2017-05-12T13:21:49Z

[BEAM-975] Improve default connection options, javadoc and style in 
MongoDbIO

commit b400f4a6f06e5d826d35045658c4a383cdd5953b
Author: Jean-Baptiste Onofré 
Date:   2017-06-19T19:24:08Z

[BEAM-975] This closes #3118

commit 6d27282562911179ea3ff19fd7eae54e8b89425d
Author: Ismaël Mejía 
Date:   2017-06-19T14:43:28Z

Make HBaseIO tests faster by only using the core daemons needed by HBase

commit c6c8d9ee6b60fcc5d019c41ddab697477704f6d8
Author: Ismaël Mejía 
Date:   2017-06-19T21:23:01Z

This closes #3392

commit 595ca1ec84134328be5be3c8ae21a5a43a5a7166
Author: Luke Cwik 
Date:   2017-06-19T22:48:06Z

[BEAM-1348] Fix type error introduced into Python SDK because of PR/3268

commit 2304972c5c3b944be0047253687d9cd0004ef21f
Author: Luke Cwik 
Date:   2017-06-19T23:05:25Z

[BEAM-1348] Fix type error introduced into Python SDK because of PR/3268

This closes #3397

commit 1ec59a08a3fab5ac0918d7f1a33b82427957b630
Author: Ismaël Mejía 
Date:   2017-06-05T21:48:38Z

[BEAM-2411] Make the write transform of HBaseIO simpler

commit d42f6333141e85964d009110d8bea85ad4763632
Author: Ismaël Mejía 
Date:   2017-06-20T08:00:15Z

[BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference

commit eae0d05bd7c088accd927dcfe3e511efbb11c9fd
Author: Ismaël Mejía 
Date:   2017-06-20T09:49:25Z

This closes #3391

commit 6e4357225477d6beb4cb9735255d1759f4fab168
Author: Eugene Kirpichov 
Date:   2017-06-19T18:56:29Z

Retries http code 0 (usually network error)

commit c1a2226c90bed7b7bf68a4cd240c849dc46e55ac
Author: Luke Cwik 
Date:   2017-06-20T15:53:59Z

Retries http code 0 (usually network error)

This closes #3394

commit 5e12e9d75ab78f210b3b024a77c52aaec033218c
Author: jasonkuster 
Date:   2017-06-20T19:05:22Z

Remove notifications from JDK versions test.

commit 0eb4004a8b91760a66585fb486226513686af002
Author: Kenneth Knowles 
Date:   2017-06-20T19:39:34Z

This closes #3403: Remove notifications from JDK versions test.

commit b7ff103f6ee10b07c50ddbd5a49a6a8ce6686087
Author: Eugene Kirpichov 
Date:   2017-06-16T21:27:51Z

Increases backoff in GcsUtil

commit 59598d8f41e65f9a068d7446457395e112dc3bc7
Author: Luke Cwik 
Date:   2017-06-20T20:11:06Z

Increases backoff in GcsUtil

This closes #3381

commit a0523b2dab617d6aee59708a8d8959f42049fce9
Author: Vikas Kedigehalli 
Date:   2017-06-19T18:24:14Z

Fix dataflow runner test to call pipeline.run instead of runner.run

commit f51fdd960cbfbb9ab2b2870606bd0e221d4beceb
Author: chamik...@google.com 
Date:   2017-06-20T20:32:49Z

This closes #3393

commit 08ec0d4dbff330ecd48c806cd764ab5a96835bd9
Author: Robert Bradshaw 
Date:   2017-06-20T18:01:03Z

Port fn_api_runner to be able to use runner protos.

commit e4ef23e16859e31e09e5fe6cf861d6f3db816b22
Author: Robert Bradshaw 
Date:   2017-06-20T20:47:31Z

Closes #3361

commit f69e3b53fafa4b79b21095d4b65edbe7cfeb7d2a
Author: Pei He 
Date:   2017-06-19T22:55:48Z

FlinkRunner: remove the unused ReflectiveOneToOneOverrideFactory.

commit 52794096aa8b4d614423fd787835f5b89b1ea1ac
Author: Pei He 
Date:   2017-06-19T23:10:02Z

Flink runner: refactor the translator into two phases: rewriting and 
translating.

commit 608a9c4590ebd94e53ee1ec7f3ad60bfb4905c11
Author: Pei He 
Date:   2017-06-20T21:12:55Z

This closes #3275

commit 42a2de91adf1387bb8eaf9aa515a24f6f276bf40
Author: Mairbek Khadikov 
Date:   2017-06-14T20:03:36Z

Support ValueProviders in SpannerIO.Write

commit 10e47646dd5f20d4049d670249cae56c51768ae0
Author: Eugene Kirpichov 
Date:   2017-06-20T21:25:56Z

This closes #3358: [BEAM-1542] Support ValueProviders in SpannerIO

commit 69b01a6118702277348d2f625af669225c9ed99e
Author: Reuven Lax 
Date:   2017-05-13T19:53:08Z

Add spilling code to WriteFiles.

commit 698b89e2b5b88403a5c762b0

Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #2617

2017-07-12 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2572) Implement an S3 filesystem for Python SDK

2017-07-12 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084932#comment-16084932
 ] 

Dmitry Demeshchuk commented on BEAM-2572:
-

So, what do you folks think about the approach from 
https://issues.apache.org/jira/browse/BEAM-2572?focusedCommentId=16083108&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16083108
 ? I kind of dislike that it steps away from the common 
ReadFromFile/WriteToFile pattern, but the alternatives we've discussed so far 
seem even uglier.

> Implement an S3 filesystem for Python SDK
> -
>
> Key: BEAM-2572
> URL: https://issues.apache.org/jira/browse/BEAM-2572
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py
>Reporter: Dmitry Demeshchuk
>Assignee: Ahmet Altay
>Priority: Minor
>
> There are two paths worth exploring, to my understanding:
> 1. Sticking to the HDFS-based approach (like it's done in Java).
> 2. Using boto/boto3 for accessing S3 through its common API endpoints.
> I personally prefer the second approach, for a few reasons:
> 1. In real life, HDFS and S3 have different consistency guarantees, therefore 
> their behaviors may contradict each other in some edge cases (say, we write 
> something to S3, but it's not immediately accessible for reading from another 
> end).
> 2. There are other AWS-based sources and sinks we may want to create in the 
> future: DynamoDB, Kinesis, SQS, etc.
> 3. boto3 already provides somewhat good logic for basic things like 
> reattempting.
> Whatever path we choose, there's another problem related to this: we 
> currently cannot pass any global settings (say, pipeline options, or just an 
> arbitrary kwarg) to a filesystem. Because of that, we'd have to setup the 
> runner nodes to have AWS keys set up in the environment, which is not trivial 
> to achieve and doesn't look too clean either (I'd rather see one single place 
> for configuring the runner options).
> Also, it's worth mentioning that I already have a janky S3 filesystem 
> implementation that only supports DirectRunner at the moment (because of the 
> previous paragraph). I'm perfectly fine finishing it myself, with some 
> guidance from the maintainers.
> Where should I move on from here, and whose input should I be looking for?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is unstable: beam_PostCommit_Java_ValidatesRunner_Flink #3386

2017-07-12 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2447) Reintroduce DoFn.ProcessContinuation

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084903#comment-16084903
 ] 

ASF GitHub Bot commented on BEAM-2447:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3360


> Reintroduce DoFn.ProcessContinuation
> 
>
> Key: BEAM-2447
> URL: https://issues.apache.org/jira/browse/BEAM-2447
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>
> ProcessContinuation.resume() is useful for tailing files - when we reach 
> current EOF, we want to voluntarily suspend the process() call rather than 
> wait for runner to checkpoint us.
> In BEAM-1903, DoFn.ProcessContinuation was removed because there was 
> ambiguity about the semantics of resume() especially w.r.t. the following 
> situation described in 
> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
>  : the runner has taken a checkpoint on the tracker, and then the 
> ProcessElement call returns resume() signaling that the work is still not 
> done - then there's 2 checkpoints to deal with.
> Instead, the proper way to refine this semantics is:
> - After checkpoint() on a RestrictionTracker, the tracker MUST fail all 
> subsequent tryClaim() calls, and MUST succeed in checkDone().
> - After a failed tryClaim() call, the ProcessElement method MUST return stop()
> - So ProcessElement can return resume() only *instead* of doing tryClaim()
> - Then, if the runner has already taken a checkpoint but tracker has returned 
> resume(), we do not need to take a new checkpoint - the one already taken 
> already accurately describes the remainder of the work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2447) Reintroduce DoFn.ProcessContinuation

2017-07-12 Thread Eugene Kirpichov (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov closed BEAM-2447.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

> Reintroduce DoFn.ProcessContinuation
> 
>
> Key: BEAM-2447
> URL: https://issues.apache.org/jira/browse/BEAM-2447
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.2.0
>
>
> ProcessContinuation.resume() is useful for tailing files - when we reach 
> current EOF, we want to voluntarily suspend the process() call rather than 
> wait for runner to checkpoint us.
> In BEAM-1903, DoFn.ProcessContinuation was removed because there was 
> ambiguity about the semantics of resume() especially w.r.t. the following 
> situation described in 
> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
>  : the runner has taken a checkpoint on the tracker, and then the 
> ProcessElement call returns resume() signaling that the work is still not 
> done - then there's 2 checkpoints to deal with.
> Instead, the proper way to refine this semantics is:
> - After checkpoint() on a RestrictionTracker, the tracker MUST fail all 
> subsequent tryClaim() calls, and MUST succeed in checkDone().
> - After a failed tryClaim() call, the ProcessElement method MUST return stop()
> - So ProcessElement can return resume() only *instead* of doing tryClaim()
> - Then, if the runner has already taken a checkpoint but tracker has returned 
> resume(), we do not need to take a new checkpoint - the one already taken 
> already accurately describes the remainder of the work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2520) add UDF/UDAF in BeamSql.query/simpleQuery

2017-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084897#comment-16084897
 ] 

ASF GitHub Bot commented on BEAM-2520:
--

Github user XuMingmin closed the pull request at:

https://github.com/apache/beam/pull/3491


> add UDF/UDAF in BeamSql.query/simpleQuery
> -
>
> Key: BEAM-2520
> URL: https://issues.apache.org/jira/browse/BEAM-2520
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>  Labels: dsl_sql_merge
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3553: [BEAM-2610] upgrade to version 2.2.0

2017-07-12 Thread XuMingmin
GitHub user XuMingmin opened a pull request:

https://github.com/apache/beam/pull/3553

[BEAM-2610] upgrade to version 2.2.0

As described in task 
[BEAM-2610](https://issues.apache.org/jira/browse/BEAM-2610), this is the first 
PR to do the job. Feel free to merge it if no mis-operation during creating the 
PR as any issues are supposed to be fixed in the second PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/beam master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3553


commit 87be64e9817da5e5c86a243471021268d6281b33
Author: Jean-Baptiste Onofré 
Date:   2017-05-12T13:21:49Z

[BEAM-975] Improve default connection options, javadoc and style in 
MongoDbIO

commit b400f4a6f06e5d826d35045658c4a383cdd5953b
Author: Jean-Baptiste Onofré 
Date:   2017-06-19T19:24:08Z

[BEAM-975] This closes #3118

commit 6d27282562911179ea3ff19fd7eae54e8b89425d
Author: Ismaël Mejía 
Date:   2017-06-19T14:43:28Z

Make HBaseIO tests faster by only using the core daemons needed by HBase

commit c6c8d9ee6b60fcc5d019c41ddab697477704f6d8
Author: Ismaël Mejía 
Date:   2017-06-19T21:23:01Z

This closes #3392

commit 595ca1ec84134328be5be3c8ae21a5a43a5a7166
Author: Luke Cwik 
Date:   2017-06-19T22:48:06Z

[BEAM-1348] Fix type error introduced into Python SDK because of PR/3268

commit 2304972c5c3b944be0047253687d9cd0004ef21f
Author: Luke Cwik 
Date:   2017-06-19T23:05:25Z

[BEAM-1348] Fix type error introduced into Python SDK because of PR/3268

This closes #3397

commit 1ec59a08a3fab5ac0918d7f1a33b82427957b630
Author: Ismaël Mejía 
Date:   2017-06-05T21:48:38Z

[BEAM-2411] Make the write transform of HBaseIO simpler

commit d42f6333141e85964d009110d8bea85ad4763632
Author: Ismaël Mejía 
Date:   2017-06-20T08:00:15Z

[BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference

commit eae0d05bd7c088accd927dcfe3e511efbb11c9fd
Author: Ismaël Mejía 
Date:   2017-06-20T09:49:25Z

This closes #3391

commit 6e4357225477d6beb4cb9735255d1759f4fab168
Author: Eugene Kirpichov 
Date:   2017-06-19T18:56:29Z

Retries http code 0 (usually network error)

commit c1a2226c90bed7b7bf68a4cd240c849dc46e55ac
Author: Luke Cwik 
Date:   2017-06-20T15:53:59Z

Retries http code 0 (usually network error)

This closes #3394

commit 5e12e9d75ab78f210b3b024a77c52aaec033218c
Author: jasonkuster 
Date:   2017-06-20T19:05:22Z

Remove notifications from JDK versions test.

commit 0eb4004a8b91760a66585fb486226513686af002
Author: Kenneth Knowles 
Date:   2017-06-20T19:39:34Z

This closes #3403: Remove notifications from JDK versions test.

commit b7ff103f6ee10b07c50ddbd5a49a6a8ce6686087
Author: Eugene Kirpichov 
Date:   2017-06-16T21:27:51Z

Increases backoff in GcsUtil

commit 59598d8f41e65f9a068d7446457395e112dc3bc7
Author: Luke Cwik 
Date:   2017-06-20T20:11:06Z

Increases backoff in GcsUtil

This closes #3381

commit a0523b2dab617d6aee59708a8d8959f42049fce9
Author: Vikas Kedigehalli 
Date:   2017-06-19T18:24:14Z

Fix dataflow runner test to call pipeline.run instead of runner.run

commit f51fdd960cbfbb9ab2b2870606bd0e221d4beceb
Author: chamik...@google.com 
Date:   2017-06-20T20:32:49Z

This closes #3393

commit 08ec0d4dbff330ecd48c806cd764ab5a96835bd9
Author: Robert Bradshaw 
Date:   2017-06-20T18:01:03Z

Port fn_api_runner to be able to use runner protos.

commit e4ef23e16859e31e09e5fe6cf861d6f3db816b22
Author: Robert Bradshaw 
Date:   2017-06-20T20:47:31Z

Closes #3361

commit f69e3b53fafa4b79b21095d4b65edbe7cfeb7d2a
Author: Pei He 
Date:   2017-06-19T22:55:48Z

FlinkRunner: remove the unused ReflectiveOneToOneOverrideFactory.

commit 52794096aa8b4d614423fd787835f5b89b1ea1ac
Author: Pei He 
Date:   2017-06-19T23:10:02Z

Flink runner: refactor the translator into two phases: rewriting and 
translating.

commit 608a9c4590ebd94e53ee1ec7f3ad60bfb4905c11
Author: Pei He 
Date:   2017-06-20T21:12:55Z

This closes #3275

commit 42a2de91adf1387bb8eaf9aa515a24f6f276bf40
Author: Mairbek Khadikov 
Date:   2017-06-14T20:03:36Z

Support ValueProviders in SpannerIO.Write

commit 10e47646dd5f20d4049d670249cae56c51768ae0
Author: Eugene Kirpichov 
Date:   2017-06-20T21:25:56Z

This closes #3358: [BEAM-1542] Support ValueProviders in SpannerIO

commit 69b01a6118702277348d2f625af669225c9ed99e
Author: Reuven Lax 
Date:   2017-05-13T19:53:08Z

Add spilling code to WriteFiles.

commit 698b89e2b5b88403a5c762b039d3ec8c48b25b26
Author: Eugene Kirpichov 
Date:   2017-06-20T21:28:39Z

This closes #3161: [BEAM-2302] Add spilling code to WriteFiles.

commit a06c8bfae6fb9e35deeb4adfdd7761889b12be89
Author: Eugene Kirpichov 
Date:   2017-02-02T01:26:

[1/2] beam git commit: [BEAM-2447] Reintroduces DoFn.ProcessContinuation

2017-07-12 Thread jkff
Repository: beam
Updated Branches:
  refs/heads/master 91c7d3d1f -> 66b4a1be0


[BEAM-2447] Reintroduces DoFn.ProcessContinuation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bff4a78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bff4a78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bff4a78

Branch: refs/heads/master
Commit: 1bff4a786536ff1a4ffe9904079c7a89058e6b4e
Parents: 91c7d3d
Author: Eugene Kirpichov 
Authored: Tue Jun 13 16:50:35 2017 -0700
Committer: Eugene Kirpichov 
Committed: Wed Jul 12 16:10:07 2017 -0700

--
 .../core/construction/SplittableParDoTest.java  |  10 +-
 ...eBoundedSplittableProcessElementInvoker.java |  35 ++-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   9 +-
 .../core/SplittableProcessElementInvoker.java   |  25 -
 ...ndedSplittableProcessElementInvokerTest.java |  45 +++--
 .../core/SplittableParDoProcessFnTest.java  |  99 --
 .../org/apache/beam/sdk/transforms/DoFn.java|  51 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java|  19 +++-
 .../sdk/transforms/reflect/DoFnInvoker.java |   4 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  10 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +++-
 .../splittabledofn/OffsetRangeTracker.java  |  10 ++
 .../splittabledofn/RestrictionTracker.java  |  11 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 100 ---
 .../transforms/reflect/DoFnInvokersTest.java|  93 +
 .../DoFnSignaturesProcessElementTest.java   |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java   |  83 +--
 17 files changed, 487 insertions(+), 141 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
--
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index f4c596e..267232c 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
@@ -24,8 +25,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -70,7 +69,6 @@ public class SplittableParDoTest {
 public void checkDone() {}
   }
 
-  @BoundedPerElement
   private static class BoundedFakeFn extends DoFn {
 @ProcessElement
 public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
@@ -81,10 +79,12 @@ public class SplittableParDoTest {
 }
   }
 
-  @UnboundedPerElement
   private static class UnboundedFakeFn extends DoFn {
 @ProcessElement
-public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
+public ProcessContinuation processElement(
+ProcessContext context, SomeRestrictionTracker tracker) {
+  return stop();
+}
 
 @GetInitialRestriction
 public SomeRestriction getInitialRestriction(Integer element) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 475abf2..0c956d5 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -96,7 +96,7 @@ public class 
OutputAnd

[GitHub] beam pull request #3360: [BEAM-2447] Reintroduces DoFn.ProcessContinuation

2017-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3360


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #3360: [BEAM-2447] Reintroduces DoFn.ProcessContinuation

2017-07-12 Thread jkff
This closes #3360: [BEAM-2447] Reintroduces DoFn.ProcessContinuation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66b4a1be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66b4a1be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66b4a1be

Branch: refs/heads/master
Commit: 66b4a1be09b58a761cf49cc18a04eaaff555e376
Parents: 91c7d3d 1bff4a7
Author: Eugene Kirpichov 
Authored: Wed Jul 12 16:16:05 2017 -0700
Committer: Eugene Kirpichov 
Committed: Wed Jul 12 16:16:05 2017 -0700

--
 .../core/construction/SplittableParDoTest.java  |  10 +-
 ...eBoundedSplittableProcessElementInvoker.java |  35 ++-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   9 +-
 .../core/SplittableProcessElementInvoker.java   |  25 -
 ...ndedSplittableProcessElementInvokerTest.java |  45 +++--
 .../core/SplittableParDoProcessFnTest.java  |  99 --
 .../org/apache/beam/sdk/transforms/DoFn.java|  51 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java|  19 +++-
 .../sdk/transforms/reflect/DoFnInvoker.java |   4 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  10 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +++-
 .../splittabledofn/OffsetRangeTracker.java  |  10 ++
 .../splittabledofn/RestrictionTracker.java  |  11 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 100 ---
 .../transforms/reflect/DoFnInvokersTest.java|  93 +
 .../DoFnSignaturesProcessElementTest.java   |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java   |  83 +--
 17 files changed, 487 insertions(+), 141 deletions(-)
--




[jira] [Created] (BEAM-2610) upgrade to version 2.2.0

2017-07-12 Thread Xu Mingmin (JIRA)
Xu Mingmin created BEAM-2610:


 Summary: upgrade to version 2.2.0
 Key: BEAM-2610
 URL: https://issues.apache.org/jira/browse/BEAM-2610
 Project: Beam
  Issue Type: Task
  Components: dsl-sql
Reporter: Xu Mingmin
Assignee: Xu Mingmin


This task syncs changes from master branch which is now using version 
2.2.0-SNAPSHOT. 

As usual, there will be two PRs,
1. a pull request from master to DSL_SQL, this one is merged by ignoring any 
errors;
2. a second PR to finish the change in DSL_SQL, and also fix any potential 
issue;




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2520) add UDF/UDAF in BeamSql.query/simpleQuery

2017-07-12 Thread Xu Mingmin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Mingmin closed BEAM-2520.

   Resolution: Fixed
Fix Version/s: 2.2.0

> add UDF/UDAF in BeamSql.query/simpleQuery
> -
>
> Key: BEAM-2520
> URL: https://issues.apache.org/jira/browse/BEAM-2520
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>  Labels: dsl_sql_merge
> Fix For: 2.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3491: [BEAM-2520] add UDF/UDAF in BeamSql.query/simpleQue...

2017-07-12 Thread XuMingmin
Github user XuMingmin closed the pull request at:

https://github.com/apache/beam/pull/3491


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: [BEAM-2520] This closes #3491

2017-07-12 Thread takidau
[BEAM-2520] This closes #3491


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d89d1ee1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d89d1ee1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d89d1ee1

Branch: refs/heads/DSL_SQL
Commit: d89d1ee1a3085269cdf44ec50e29a95c8f43757b
Parents: 25fea4e 5ca54e9
Author: Tyler Akidau 
Authored: Wed Jul 12 15:57:53 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 15:57:53 2017 -0700

--
 .../java/org/apache/beam/dsls/sql/BeamSql.java  | 114 ++-
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java|   6 +-
 .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java| 137 +++
 3 files changed, 221 insertions(+), 36 deletions(-)
--




[beam-site] branch asf-site updated (f88e604 -> 1d1840e)

2017-07-12 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


from f88e604  Prepare repository for deployment.
 add a0207d0  Fix broken links
 add c1fe617  Update to latest version
 add 6aee6f0  Address review comments
 add 2c6b414  Address review comments
 add 350bf24  This closes #271
 new 1d1840e  Prepare repository for deployment.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/contribute/ptransform-style-guide/index.html | 3 ++-
 content/documentation/runners/direct/index.html  | 2 +-
 src/contribute/ptransform-style-guide.md | 3 ++-
 src/documentation/runners/direct.md  | 2 +-
 4 files changed, 6 insertions(+), 4 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[1/2] beam git commit: support UDF/UDAF in BeamSql

2017-07-12 Thread takidau
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 25fea4e1e -> d89d1ee1a


support UDF/UDAF in BeamSql


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5ca54e95
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5ca54e95
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5ca54e95

Branch: refs/heads/DSL_SQL
Commit: 5ca54e956e80f3059a9e67bf9b3d34af08569ff1
Parents: 25fea4e
Author: mingmxu 
Authored: Sun Jul 2 21:24:07 2017 -0700
Committer: Tyler Akidau 
Committed: Wed Jul 12 15:54:03 2017 -0700

--
 .../java/org/apache/beam/dsls/sql/BeamSql.java  | 114 ++-
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java|   6 +-
 .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java| 137 +++
 3 files changed, 221 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5ca54e95/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
--
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index a0e7cbc..ec3799c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -17,10 +17,12 @@
  */
 package org.apache.beam.dsls.sql;
 
+import com.google.auto.value.AutoValue;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -51,7 +53,9 @@ PCollection inputTableB = 
p.apply(TextIO.read().from("/my/input/path
 
 //run a simple query, and register the output as a table in BeamSql;
 String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
-PCollection outputTableA = 
inputTableA.apply(BeamSql.simpleQuery(sql1));
+PCollection outputTableA = inputTableA.apply(
+BeamSql.simpleQuery(sql1)
+.withUdf("MY_FUNC", MY_FUNC.class, "FUNC"));
 
 //run a JOIN with one table from TextIO, and one table from another query
 PCollection outputTableB = PCollectionTuple.of(
@@ -60,7 +64,7 @@ PCollection outputTableB = PCollectionTuple.of(
 .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
 
 //output the final result with TextIO
-outputTableB.apply(BeamSql.toTextRow()).apply(TextIO.write().to("/my/output/path"));
+outputTableB.apply(...).apply(TextIO.write().to("/my/output/path"));
 
 p.run().waitUntilFinish();
  * }
@@ -68,7 +72,6 @@ p.run().waitUntilFinish();
  */
 @Experimental
 public class BeamSql {
-
   /**
* Transforms a SQL query into a {@link PTransform} representing an 
equivalent execution plan.
*
@@ -80,9 +83,11 @@ public class BeamSql {
* It is an error to apply a {@link PCollectionTuple} missing any {@code 
table names}
* referenced within the query.
*/
-  public static PTransform> 
query(String sqlQuery) {
-return new QueryTransform(sqlQuery);
-
+  public static QueryTransform query(String sqlQuery) {
+return QueryTransform.builder()
+.setSqlEnv(new BeamSqlEnv())
+.setSqlQuery(sqlQuery)
+.build();
   }
 
   /**
@@ -93,42 +98,62 @@ public class BeamSql {
*
* Make sure to query it from a static table name PCOLLECTION.
*/
-  public static PTransform, PCollection>
-  simpleQuery(String sqlQuery) throws Exception {
-return new SimpleQueryTransform(sqlQuery);
+  public static SimpleQueryTransform simpleQuery(String sqlQuery) throws 
Exception {
+return SimpleQueryTransform.builder()
+.setSqlEnv(new BeamSqlEnv())
+.setSqlQuery(sqlQuery)
+.build();
   }
 
   /**
* A {@link PTransform} representing an execution plan for a SQL query.
*/
-  private static class QueryTransform extends
+  @AutoValue
+  public abstract static class QueryTransform extends
   PTransform> {
-private transient BeamSqlEnv sqlEnv;
-private String sqlQuery;
+abstract BeamSqlEnv getSqlEnv();
+abstract String getSqlQuery();
 
-public QueryTransform(String sqlQuery) {
-  this.sqlQuery = sqlQuery;
-  sqlEnv = new BeamSqlEnv();
+static Builder builder() {
+  return new AutoValue_BeamSql_QueryTransform.Builder();
 }
 
-public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) {
-  this.sqlQuery = sqlQuery;
-  this.sqlEnv = sqlEnv;
+@AutoValue.Builder
+abstract static class Builder {
+  abstract Builder setSqlQuery(String sqlQuery);
+  abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+  abstract Quer

[beam-site] 01/01: Prepare repository for deployment.

2017-07-12 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 1d1840e5c3e38e4c1de472272483c3047dfad1be
Author: Mergebot 
AuthorDate: Wed Jul 12 22:58:20 2017 +

Prepare repository for deployment.
---
 content/contribute/ptransform-style-guide/index.html | 3 ++-
 content/documentation/runners/direct/index.html  | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/content/contribute/ptransform-style-guide/index.html 
b/content/contribute/ptransform-style-guide/index.html
index 1a83e41..56381bb 100644
--- a/content/contribute/ptransform-style-guide/index.html
+++ b/content/contribute/ptransform-style-guide/index.html
@@ -395,7 +395,8 @@ E.g. when expanding a filepattern into files, log what the 
filepattern was and h
 
 
   Generally, follow the rules of http://semver.org/";>semantic 
versioning.
-  If the API of the transform is not yet stable, mark it @Experimental.
+  If the API of the transform is not yet stable, annotate it as @Experimental (Java) or @experimental (https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.utils.html#module-apache_beam.utils.annotations";>Python).
+  If the API deprecated, annotate it as @Deprecated (Java) or @deprecated (https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.utils.html#module-apache_beam.utils.annotations";>Python).
   Pay attention to the stability and versioning of third-party classes 
exposed by the transform’s API: if they are unstable or improperly versioned 
(do not obey http://semver.org/";>semantic versioning), it is 
better to wrap them in your own classes.
 
 
diff --git a/content/documentation/runners/direct/index.html 
b/content/documentation/runners/direct/index.html
index c82a43e..7f39a99 100644
--- a/content/documentation/runners/direct/index.html
+++ b/content/documentation/runners/direct/index.html
@@ -165,7 +165,7 @@
   The Apache 
Beam WordCount Example contains an example of logging and testing a 
pipeline with PAssert.
 
   
-  You can use https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L206";>assert_that
 to test your pipeline. The Python https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py";>WordCount
 Debugging Example contains an example of logging and testing with https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L206";>assert_
 [...]
+  You can use https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/util.py#L76";>assert_that
 to test your pipeline. The Python https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py";>WordCount
 Debugging Example contains an example of logging and testing with 
assert_that.
 
 
 Direct Runner prerequisites and 
setup

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 05/05: This closes #271

2017-07-12 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 350bf242a0dc40c20680945df9f86464a4b5410a
Merge: f88e604 2c6b414
Author: Mergebot 
AuthorDate: Wed Jul 12 22:56:06 2017 +

This closes #271

 src/contribute/ptransform-style-guide.md | 3 ++-
 src/documentation/runners/direct.md  | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 01/05: Fix broken links

2017-07-12 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit a0207d0f86cb05cb23fbe2f1a5821beb144bc8fd
Author: Maria Garcia Herrero 
AuthorDate: Tue Jul 11 16:22:40 2017 -0700

Fix broken links
---
 src/contribute/ptransform-style-guide.md | 3 ++-
 src/documentation/runners/direct.md  | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/contribute/ptransform-style-guide.md 
b/src/contribute/ptransform-style-guide.md
index 34f7c45..71b9860 100644
--- a/src/contribute/ptransform-style-guide.md
+++ b/src/contribute/ptransform-style-guide.md
@@ -172,7 +172,8 @@ Data processing is tricky, full of corner cases, and 
difficult to debug, because
 Do:
 
 * Generally, follow the rules of [semantic versioning](http://semver.org/).
-* If the API of the transform is not yet stable, mark it `@Experimental`.
+* If the API of the transform is not yet stable, annotate it as 
`@Experimental` (Java) or `@experimental` 
([python](https://beam.apache.org/documentation/sdks/pydoc/0.6.0/apache_beam.utils.html#module-apache_beam.utils.annotations)).
+* If the API deprecated, annotate it as `@Deprecated` (Java) or `@deprecated` 
([python](https://beam.apache.org/documentation/sdks/pydoc/0.6.0/apache_beam.utils.html#module-apache_beam.utils.annotations)).
 * Pay attention to the stability and versioning of third-party classes exposed 
by the transform's API: if they are unstable or improperly versioned (do not 
obey [semantic versioning](http://semver.org/)), it is better to wrap them in 
your own classes.
 
 Do not:
diff --git a/src/documentation/runners/direct.md 
b/src/documentation/runners/direct.md
index 5083ec7..b11976e 100644
--- a/src/documentation/runners/direct.md
+++ b/src/documentation/runners/direct.md
@@ -30,7 +30,7 @@ Here are some resources with information about how to test 
your pipelines.
   The Apache 
Beam WordCount Example contains an example of logging and testing a 
pipeline with PAssert.
 
   
-  You can use https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L206";>assert_that
 to test your pipeline. The Python https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py";>WordCount
 Debugging Example contains an example of logging and testing with https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L206";>assert_
 [...]
+  You can use https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/util.py#L76";>assert_that
 to test your pipeline. The Python https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py";>WordCount
 Debugging Example contains an example of logging and testing with 
assert_that.
 
 
 ## Direct Runner prerequisites and setup

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


  1   2   >