[jira] (BEAM-1345) Mark @Experimental pieces of the SDK that have been added but are not ready to be frozen

2017-01-29 Thread JIRA
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jean-Baptiste Onofré commented on  BEAM-1345 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Mark @Experimental pieces of the SDK that have been added but are not ready to be frozen  
 
 
 
 
 
 
 
 
 
 
I created this Jira related to IO: https://issues.apache.org/jira/browse/BEAM-911 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1345) Mark @Experimental pieces of the SDK that have been added but are not ready to be frozen

2017-01-29 Thread Kenneth Knowles (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Kenneth Knowles created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1345 
 
 
 
  Mark @Experimental pieces of the SDK that have been added but are not ready to be frozen  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Improvement 
 
 
 

Assignee:
 

 Unassigned 
 
 
 

Components:
 

 sdk-java-core, sdk-java-extensions, sdk-java-gcp 
 
 
 

Created:
 

 30/Jan/17 04:16 
 
 
 

Labels:
 

 backward-incompatible 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Kenneth Knowles 
 
 
 
 
 
 
 
 
 
 
A blanket JIRA to ensure that before a stable release we make sure to mark those pieces that would be unwise to freeze yet and consider how best to communicate this to users, who may just autocomplete those features in their IDE anyhow. (conversely, put infrastructure in place to enforce freezing of the rest) 
Not technically "backwards incompatible" with pre-Beam Dataflow SDKs, but certainly needs to be on the burndown for the first stable release. 
 
 
 
 
 
 
 
 
 
 
 
 

  

Jenkins build is back to normal : beam_PostCommit_Python_Verify #1112

2017-01-29 Thread Apache Jenkins Server
See 



Build failed in Jenkins: beam_PostCommit_Python_Verify #1111

2017-01-29 Thread Apache Jenkins Server
See 

--
[...truncated 8135 lines...]
  "properties": {
"display_data": [], 
"output_info": [
  {
"encoding": {
  "@type": "kind:windowed_value", 
  "component_encodings": [
{
  "@type": "kind:pair", 
  "component_encodings": [
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": [
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": []
}, 
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": []
}
  ], 
  "is_pair_like": true
}, 
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": [
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": []
}, 
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": []
}
  ], 
  "is_pair_like": true
}
  ], 
  "is_pair_like": true
}, 
{
  "@type": "kind:global_window"
}
  ], 
  "is_wrapper": true
}, 
"output_name": "out", 
"user_name": "assert:even/Group.out"
  }
], 
"parallel_input": {
  "@type": "OutputReference", 
  "output_name": "out", 
  "step_name": "s14"
}, 
"serialized_fn": "", 
"user_name": "assert:even/Group"
  }
}, 
{
  "kind": "ParallelDo", 
  "name": "s16", 
  "properties": {
"display_data": [
  {
"key": "fn", 
"label": "Transform Function", 
"namespace": "apache_beam.transforms.core.CallableWrapperDoFn", 
"type": "STRING", 
"value": ""
  }, 
  {
"key": "fn", 
"label": "Transform Function", 
"namespace": "apache_beam.transforms.core.ParDo", 
"shortValue": "CallableWrapperDoFn", 
"type": "STRING", 
"value": "apache_beam.transforms.core.CallableWrapperDoFn"
  }
], 
"non_parallel_inputs": {}, 
"output_info": [
  {
"encoding": {
  "@type": "kind:windowed_value", 
  "component_encodings": [
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": [
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": []
}, 
{
  "@type": 
"FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
 
  "component_encodings": []
}
  ], 
  "is_pair_like": true
}, 
{
  "@type": "kind:global_window"
}
  ], 
  "is_wrapper": true
}, 
"output_name": "out", 
"user_name": "assert:even/UnKey.out"
  }
], 
"parallel_input": {
  "@type": "OutputReference", 
  "output_name": "out", 
  "step_name": "s15"
 

[jira] (BEAM-797) A PipelineVisitor that creates a Spark-native pipeline.

2017-01-29 Thread JIRA
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Ismaël Mejía commented on  BEAM-797 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: A PipelineVisitor that creates a Spark-native pipeline.   
 
 
 
 
 
 
 
 
 
 
Hi, I am freeing this one, I worked a bit on it, but not really more than our early discussion. I reassign to Aviem then. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-797) A PipelineVisitor that creates a Spark-native pipeline.

2017-01-29 Thread JIRA
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Ismaël Mejía assigned an issue to Aviem Zur 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-797 
 
 
 
  A PipelineVisitor that creates a Spark-native pipeline.   
 
 
 
 
 
 
 
 
 

Change By:
 
 Ismaël Mejía 
 
 
 

Assignee:
 
 Ismaël Mejía Aviem Zur 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-797) A PipelineVisitor that creates a Spark-native pipeline.

2017-01-29 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-797 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: A PipelineVisitor that creates a Spark-native pipeline.   
 
 
 
 
 
 
 
 
 
 
GitHub user aviemzur opened a pull request: 
 https://github.com/apache/beam/pull/1868 
 BEAM-797 A PipelineVisitor that creates a Spark-native pipeline. 
 Be sure to do all of the following to help us incorporate your contribution quickly and easily: 
 

[ ] Make sure the PR title is formatted like: `BEAM- Description of pull request`
 

[ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes).
 

[ ] Replace `` in the title with the actual Jira issue number, if there is one.
 

[ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt).
 
 
 — 
You can merge this pull request into a Git repository by running: 
 $ git pull https://github.com/aviemzur/incubator-beam spark-debug-native-pipeline 
Alternatively you can review and apply these changes as the patch at: 
 https://github.com/apache/beam/pull/1868.patch 
To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: 
 This closes #1868 
 
commit 71c762d1d3a6ac5725f3f8b470a8908dfd1bc5b2 Author: Aviem Zur  Date: 2017-01-29T19:06:14Z 
 BEAM-797 A PipelineVisitor that creates a Spark-native pipeline. 
 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 

[GitHub] beam pull request #1868: [BEAM-797] A PipelineVisitor that creates a Spark-n...

2017-01-29 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/1868

[BEAM-797] A PipelineVisitor that creates a Spark-native pipeline.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/incubator-beam 
spark-debug-native-pipeline

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1868


commit 71c762d1d3a6ac5725f3f8b470a8908dfd1bc5b2
Author: Aviem Zur 
Date:   2017-01-29T19:06:14Z

[BEAM-797] A PipelineVisitor that creates a Spark-native pipeline.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #1861: Merge master into python-sdk

2017-01-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1861


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[28/50] beam git commit: This closes #1847

2017-01-29 Thread dhalperi
This closes #1847


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1c6e6674
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1c6e6674
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1c6e6674

Branch: refs/heads/python-sdk
Commit: 1c6e667414788fe99f583fac39d458a4984ae162
Parents: 6413299 fee029f
Author: Dan Halperin 
Authored: Wed Jan 25 17:47:08 2017 -0800
Committer: Dan Halperin 
Committed: Wed Jan 25 17:47:08 2017 -0800

--
 runners/google-cloud-dataflow-java/pom.xml  |   5 -
 .../beam/runners/dataflow/util/GcsStager.java   |  18 +-
 .../beam/runners/dataflow/util/PackageUtil.java | 349 +++
 .../runners/dataflow/util/PackageUtilTest.java  |  42 +--
 .../org/apache/beam/sdk/options/GcsOptions.java |   4 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  12 -
 6 files changed, 149 insertions(+), 281 deletions(-)
--




[43/50] beam git commit: [maven-release-plugin] prepare branch release-0.5.0

2017-01-29 Thread dhalperi
[maven-release-plugin] prepare branch release-0.5.0


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da2dff90
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da2dff90
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da2dff90

Branch: refs/heads/python-sdk
Commit: da2dff90cb10e5881496ffd4efb368ba84544174
Parents: 47304d1
Author: Jean-Baptiste Onofré 
Authored: Fri Jan 27 18:27:06 2017 +0100
Committer: Jean-Baptiste Onofré 
Committed: Fri Jan 27 18:27:06 2017 +0100

--
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/da2dff90/pom.xml
--
diff --git a/pom.xml b/pom.xml
index a96275c..2281f67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,7 @@
 
scm:git:https://git-wip-us.apache.org/repos/asf/beam.git
 
scm:git:https://git-wip-us.apache.org/repos/asf/beam.git
 https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary
+release-0.5.0
   
 
   



[12/50] beam git commit: [BEAM-59] Beam FileSystem.setDefaultConfig: remove scheme from the signature.

2017-01-29 Thread dhalperi
[BEAM-59] Beam FileSystem.setDefaultConfig: remove scheme from the signature.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4cdd8771
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4cdd8771
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4cdd8771

Branch: refs/heads/python-sdk
Commit: 4cdd87718c3d0719b7c0e421b9cbaf4eb902672e
Parents: 1148be6
Author: Pei He 
Authored: Mon Jan 23 18:08:44 2017 -0800
Committer: Dan Halperin 
Committed: Tue Jan 24 15:54:53 2017 -0800

--
 .../org/apache/beam/sdk/io/FileSystems.java | 32 +++
 .../org/apache/beam/sdk/io/FileSystemsTest.java | 33 +++-
 2 files changed, 15 insertions(+), 50 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4cdd8771/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index d086ec6..e19c1e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -53,6 +53,8 @@ public class FileSystems {
   private static final Map SCHEME_TO_REGISTRAR =
   new ConcurrentHashMap<>();
 
+  private static PipelineOptions defaultConfig;
+
   private static final Map SCHEME_TO_DEFAULT_CONFIG =
   new ConcurrentHashMap<>();
 
@@ -78,27 +80,12 @@ public class FileSystems {
   }
 
   /**
-   * Sets the default configuration to be used with a {@link 
FileSystemRegistrar} for the provided
-   * {@code scheme}.
+   * Sets the default configuration in workers.
*
-   * Syntax: scheme = alpha *( alpha | digit | "+" | "-" | "." )
-   * Upper case letters are treated as the same as lower case letters.
+   * It will be used in {@link FileSystemRegistrar FileSystemRegistrars} 
for all schemes.
*/
-  public static void setDefaultConfig(String scheme, PipelineOptions options) {
-String lowerCaseScheme = checkNotNull(scheme, "scheme").toLowerCase();
-checkArgument(
-URI_SCHEME_PATTERN.matcher(lowerCaseScheme).matches(),
-String.format("Scheme: [%s] doesn't match URI syntax: %s",
-lowerCaseScheme, URI_SCHEME_PATTERN.pattern()));
-checkArgument(
-SCHEME_TO_REGISTRAR.containsKey(lowerCaseScheme),
-String.format("No FileSystemRegistrar found for scheme: [%s].", 
lowerCaseScheme));
-SCHEME_TO_DEFAULT_CONFIG.put(lowerCaseScheme, checkNotNull(options, 
"options"));
-  }
-
-  @VisibleForTesting
-  static PipelineOptions getDefaultConfig(String scheme) {
-return SCHEME_TO_DEFAULT_CONFIG.get(scheme.toLowerCase());
+  public static void setDefaultConfigInWorkers(PipelineOptions options) {
+defaultConfig = checkNotNull(options, "options");
   }
 
   /**
@@ -106,9 +93,12 @@ public class FileSystems {
*/
   @VisibleForTesting
   static FileSystem getFileSystemInternal(URI uri) {
+checkState(
+defaultConfig != null,
+"Expect the runner have called setDefaultConfigInWorkers().");
 String lowerCaseScheme = (uri.getScheme() != null
 ? uri.getScheme().toLowerCase() : 
LocalFileSystemRegistrar.LOCAL_FILE_SCHEME);
-return 
getRegistrarInternal(lowerCaseScheme).fromOptions(getDefaultConfig(lowerCaseScheme));
+return getRegistrarInternal(lowerCaseScheme).fromOptions(defaultConfig);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/4cdd8771/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
index 9b41b98..113a562 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Sets;
@@ -26,6 +24,7 @@ import java.net.URI;
 import 

[41/50] beam git commit: This closes #1830

2017-01-29 Thread dhalperi
This closes #1830


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47304d1f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47304d1f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47304d1f

Branch: refs/heads/python-sdk
Commit: 47304d1fc75d3a7751883638efdaf9f9d8b40a25
Parents: 83f8c46 e01ce86
Author: Dan Halperin 
Authored: Thu Jan 26 22:52:12 2017 -0800
Committer: Dan Halperin 
Committed: Thu Jan 26 22:52:12 2017 -0800

--
 .../apache/beam/sdk/transforms/ToString.java| 168 ---
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   2 +-
 .../beam/sdk/transforms/ToStringTest.java   |  86 --
 3 files changed, 226 insertions(+), 30 deletions(-)
--




[17/50] beam git commit: Fix Flink RunnableOnService tests

2017-01-29 Thread dhalperi
Fix Flink RunnableOnService tests

* Check that a Multi-Output map contains the Tag, not the TaggedValue

* Return Inputs from getInputs

  Don't return outputs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a361b65d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a361b65d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a361b65d

Branch: refs/heads/python-sdk
Commit: a361b65d6aa56d70769403d884abf48d1e1141a4
Parents: 7402d76
Author: Thomas Groh 
Authored: Tue Jan 24 17:41:07 2017 -0800
Committer: Dan Halperin 
Committed: Wed Jan 25 09:03:23 2017 -0800

--
 .../runners/flink/translation/FlinkBatchTransformTranslators.java  | 2 +-
 .../flink/translation/FlinkStreamingTranslationContext.java| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a361b65d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 654b464..f7f1878 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -580,7 +580,7 @@ class FlinkBatchTransformTranslators {
   outputMap.put(transform.getMainOutputTag(), 0);
   int count = 1;
   for (TaggedPValue taggedValue : outputs) {
-if (!outputMap.containsKey(taggedValue)) {
+if (!outputMap.containsKey(taggedValue.getTag())) {
   outputMap.put(taggedValue.getTag(), count++);
 }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a361b65d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
index 6db252e..7932f68 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
@@ -107,7 +107,7 @@ public class FlinkStreamingTranslationContext {
   }
 
   public  List getInputs(PTransform 
transform) {
-return currentTransform.getOutputs();
+return currentTransform.getInputs();
   }
 
   @SuppressWarnings("unchecked")



[36/50] beam git commit: Add prefix and suffix to WindowedWordCountIT output location

2017-01-29 Thread dhalperi
Add prefix and suffix to WindowedWordCountIT output location


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d0225e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d0225e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d0225e8

Branch: refs/heads/python-sdk
Commit: 4d0225e8e29047dc7b4e0f5cea2414eaef4b038c
Parents: 9637724
Author: Kenneth Knowles 
Authored: Thu Jan 26 13:42:58 2017 -0800
Committer: Kenneth Knowles 
Committed: Thu Jan 26 13:47:08 2017 -0800

--
 .../apache/beam/examples/WindowedWordCountIT.java   | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4d0225e8/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
--
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index e4570ac..703f836 100644
--- 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -28,6 +28,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -47,8 +48,10 @@ import org.hamcrest.TypeSafeMatcher;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
@@ -58,6 +61,8 @@ import org.slf4j.LoggerFactory;
 @RunWith(JUnit4.class)
 public class WindowedWordCountIT {
 
+  @Rule public TestName testName = new TestName();
+
   private static final String DEFAULT_INPUT =
   "gs://apache-beam-samples/shakespeare/winterstale-personae";
   static final int MAX_READ_RETRIES = 4;
@@ -100,7 +105,9 @@ public class WindowedWordCountIT {
 options.setOutput(
 IOChannelUtils.resolve(
 options.getTempRoot(),
-String.format("WindowedWordCountIT-%tF-% expectedWordCounts = new TreeMap<>();
@@ -144,8 +150,8 @@ public class WindowedWordCountIT {
 
   for (String word : words) {
 if (!word.isEmpty()) {
-  expectedWordCounts.put(word,
-  MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L);
+  expectedWordCounts.put(
+  word, MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) 
+ 1L);
 }
   }
 }



[20/50] beam git commit: This closes #1843

2017-01-29 Thread dhalperi
This closes #1843


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/979c9376
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/979c9376
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/979c9376

Branch: refs/heads/python-sdk
Commit: 979c9376f820577bad43c18cc1a7ee86fab9d942
Parents: bf9d454 e95335f
Author: Dan Halperin 
Authored: Wed Jan 25 10:40:16 2017 -0800
Committer: Dan Halperin 
Committed: Wed Jan 25 10:40:16 2017 -0800

--
 runners/google-cloud-dataflow-java/pom.xml | 4 ++--
 .../org/apache/beam/runners/dataflow/dataflow.properties   | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)
--




[24/50] beam git commit: This closes #1846

2017-01-29 Thread dhalperi
This closes #1846


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95beda69
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95beda69
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95beda69

Branch: refs/heads/python-sdk
Commit: 95beda69bff7dfe519422fd19916c7a851dadf55
Parents: c525783 f05c5d3
Author: Dan Halperin 
Authored: Wed Jan 25 12:13:37 2017 -0800
Committer: Dan Halperin 
Committed: Wed Jan 25 12:13:37 2017 -0800

--
 examples/pom.xml | 14 +-
 runners/pom.xml  | 14 +-
 sdks/pom.xml | 13 +++--
 3 files changed, 25 insertions(+), 16 deletions(-)
--




[49/50] beam git commit: Update pom.xml for sdks/python.

2017-01-29 Thread dhalperi
Update pom.xml for sdks/python.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b8679c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b8679c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b8679c

Branch: refs/heads/python-sdk
Commit: f1b8679c4af283d1e751043e2e765b7f295af0b2
Parents: c2859a5
Author: Ahmet Altay 
Authored: Fri Jan 27 17:04:21 2017 -0800
Committer: Ahmet Altay 
Committed: Fri Jan 27 17:04:21 2017 -0800

--
 sdks/python/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/f1b8679c/sdks/python/pom.xml
--
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index cc90969..615ddc5 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.beam
 beam-sdks-parent
-0.5.0-incubating-SNAPSHOT
+0.6.0-SNAPSHOT
 ../pom.xml
   
 



[08/50] beam git commit: [BEAM-1071] Allow for BigQueryIO to write tables with CREATE_NEVER disposition

2017-01-29 Thread dhalperi
[BEAM-1071] Allow for BigQueryIO to write tables with CREATE_NEVER disposition


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc369522
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc369522
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc369522

Branch: refs/heads/python-sdk
Commit: dc369522d1cfa46ae9058919d93229de05db2b6a
Parents: 11c3cd7
Author: Sam McVeety 
Authored: Mon Dec 12 18:47:20 2016 -0800
Committer: Dan Halperin 
Committed: Tue Jan 24 14:41:39 2017 -0800

--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 51 ++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++
 2 files changed, 71 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/dc369522/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index aff199a..fa49f55 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1925,10 +1925,17 @@ public class BigQueryIO {
 
 if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || 
tableRefFunction != null) {
   // We will use BigQuery's streaming write API -- validate supported 
dispositions.
-  checkArgument(
-  createDisposition != CreateDisposition.CREATE_NEVER,
-  "CreateDisposition.CREATE_NEVER is not supported for an 
unbounded PCollection or when"
-  + " using a tablespec function.");
+  if (tableRefFunction != null) {
+checkArgument(
+createDisposition != CreateDisposition.CREATE_NEVER,
+"CreateDisposition.CREATE_NEVER is not supported when using a 
tablespec"
++ " function.");
+  }
+  if (jsonSchema == null) {
+checkArgument(
+createDisposition == CreateDisposition.CREATE_NEVER,
+"CreateDisposition.CREATE_NEVER must be used if jsonSchema is 
null.");
+  }
 
   checkArgument(
   writeDisposition != WriteDisposition.WRITE_TRUNCATE,
@@ -1965,7 +1972,9 @@ public class BigQueryIO {
 if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != 
null) {
   return input.apply(
   new StreamWithDeDup(getTable(), tableRefFunction,
-  NestedValueProvider.of(jsonSchema, new 
JsonSchemaToTableSchema()), bqServices));
+  jsonSchema == null ? null : NestedValueProvider.of(
+  jsonSchema, new JsonSchemaToTableSchema()),
+  createDisposition, bqServices));
 }
 
 ValueProvider table = 
getTableWithDefaultProject(options);
@@ -2608,16 +2617,19 @@ public class BigQueryIO {
* Implementation of DoFn to perform streaming BigQuery write.
*/
   @SystemDoFnInternal
-  private static class StreamingWriteFn
+  @VisibleForTesting
+  static class StreamingWriteFn
   extends DoFn, Void> {
 /** TableSchema in JSON. Use String to make the class Serializable. */
-private final ValueProvider jsonTableSchema;
+@Nullable private final ValueProvider jsonTableSchema;
 
 private final BigQueryServices bqServices;
 
 /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
 private transient Map tableRows;
 
+private final Write.CreateDisposition createDisposition;
+
 /** The list of unique ids for each BigQuery table row. */
 private transient Map uniqueIdsForTableRows;
 
@@ -2631,9 +2643,12 @@ public class BigQueryIO {
 createAggregator("ByteCount", Sum.ofLongs());
 
 /** Constructor. */
-StreamingWriteFn(ValueProvider schema, BigQueryServices 
bqServices) {
-  this.jsonTableSchema =
+StreamingWriteFn(@Nullable ValueProvider schema,
+Write.CreateDisposition createDisposition,
+BigQueryServices bqServices) {
+  this.jsonTableSchema = schema == null ? null :
   NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
+  this.createDisposition = createDisposition;
   this.bqServices = checkNotNull(bqServices, "bqServices");
 }
 
@@ -2689,7 +2704,8 @@ public class BigQueryIO {
 public TableReference getOrCreateTable(BigQueryOptions options, String 
tableSpec)
 

[15/50] beam git commit: This closes #1838

2017-01-29 Thread dhalperi
This closes #1838


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7402d760
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7402d760
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7402d760

Branch: refs/heads/python-sdk
Commit: 7402d760004f8e7f79ca122c5fd26ec4f35dbdbe
Parents: e77de7c f9d1d68
Author: Thomas Groh 
Authored: Tue Jan 24 18:00:43 2017 -0800
Committer: Thomas Groh 
Committed: Tue Jan 24 18:00:43 2017 -0800

--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 40 +--
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  9 ++---
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 23 ---
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 41 
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   |  3 +-
 5 files changed, 40 insertions(+), 76 deletions(-)
--




[02/50] beam git commit: [BEAM-1258] demote retrying loggings to info level.

2017-01-29 Thread dhalperi
[BEAM-1258] demote retrying loggings to info level.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3afdc5c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3afdc5c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3afdc5c0

Branch: refs/heads/python-sdk
Commit: 3afdc5c0ef37e48b1750f70e54cd64f5063da83b
Parents: 2a23e8b
Author: Pei He 
Authored: Tue Jan 24 11:09:09 2017 -0800
Committer: Dan Halperin 
Committed: Tue Jan 24 12:25:22 2017 -0800

--
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java   | 16 
 1 file changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/3afdc5c0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c9edf7c..c524ce4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -220,11 +220,11 @@ class BigQueryServicesImpl implements BigQueryServices {
 return; // SUCCEEDED
   }
   // ignore and retry
-  LOG.warn("Ignore the error and retry inserting the job.", e);
+  LOG.info("Ignore the error and retry inserting the job.", e);
   lastException = e;
 } catch (IOException e) {
   // ignore and retry
-  LOG.warn("Ignore the error and retry inserting the job.", e);
+  LOG.info("Ignore the error and retry inserting the job.", e);
   lastException = e;
 }
   } while (nextBackOff(sleeper, backoff));
@@ -261,7 +261,7 @@ class BigQueryServicesImpl implements BigQueryServices {
   // The job is not DONE, wait longer and retry.
 } catch (IOException e) {
   // ignore and retry
-  LOG.warn("Ignore the error and retry polling job status.", e);
+  LOG.info("Ignore the error and retry polling job status.", e);
 }
   } while (nextBackOff(sleeper, backoff));
   LOG.warn("Unable to poll job status: {}, aborting after reached max .", 
jobRef.getJobId());
@@ -316,12 +316,12 @@ class BigQueryServicesImpl implements BigQueryServices {
 LOG.info("No BigQuery job with job id {} found.", jobId);
 return null;
   }
-  LOG.warn(
+  LOG.info(
   "Ignoring the error encountered while trying to query the 
BigQuery job {}",
   jobId, e);
   lastException = e;
 } catch (IOException e) {
-  LOG.warn(
+  LOG.info(
   "Ignoring the error encountered while trying to query the 
BigQuery job {}",
   jobId, e);
   lastException = e;
@@ -618,10 +618,10 @@ class BigQueryServicesImpl implements BigQueryServices {
 return; // SUCCEEDED
   }
   // ignore and retry
-  LOG.warn("Ignore the error and retry creating the dataset.", e);
+  LOG.info("Ignore the error and retry creating the dataset.", e);
   lastException = e;
 } catch (IOException e) {
-  LOG.warn("Ignore the error and retry creating the dataset.", e);
+  LOG.info("Ignore the error and retry creating the dataset.", e);
   lastException = e;
 }
   } while (nextBackOff(sleeper, backoff));
@@ -891,7 +891,7 @@ class BigQueryServicesImpl implements BigQueryServices {
 if (!shouldRetry.apply(e)) {
   break;
 }
-LOG.warn("Ignore the error and retry the request.", e);
+LOG.info("Ignore the error and retry the request.", e);
   }
 } while (nextBackOff(sleeper, backoff));
 throw new IOException(



[11/50] beam git commit: This closes #1834

2017-01-29 Thread dhalperi
This closes #1834


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1148be6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1148be6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1148be6b

Branch: refs/heads/python-sdk
Commit: 1148be6bb17eae70c2753d33aebbac9f7943dd03
Parents: f2389ab bffe80d
Author: Dan Halperin 
Authored: Tue Jan 24 15:51:19 2017 -0800
Committer: Dan Halperin 
Committed: Tue Jan 24 15:51:19 2017 -0800

--
 runners/google-cloud-dataflow-java/pom.xml  |  3 ++-
 .../sdk/testing/UsesUnboundedPCollections.java  | 23 
 .../org/apache/beam/sdk/io/PubsubIOTest.java|  4 ++--
 3 files changed, 27 insertions(+), 3 deletions(-)
--




[40/50] beam git commit: Refactored existing code. Added iterable and KV. Changed from element to of.

2017-01-29 Thread dhalperi
Refactored existing code. Added iterable and KV. Changed from element to of.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e01ce864
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e01ce864
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e01ce864

Branch: refs/heads/python-sdk
Commit: e01ce864edf551afefe861041541bb2a05340a08
Parents: 83f8c46
Author: Jesse Anderson 
Authored: Tue Jan 24 08:37:33 2017 -0800
Committer: Dan Halperin 
Committed: Thu Jan 26 22:52:09 2017 -0800

--
 .../apache/beam/sdk/transforms/ToString.java| 168 ---
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   2 +-
 .../beam/sdk/transforms/ToStringTest.java   |  86 --
 3 files changed, 226 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
index ef49267..d5c9784 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -18,51 +18,181 @@
 
 package org.apache.beam.sdk.transforms;
 
+import java.util.Iterator;
+
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * {@link PTransform PTransforms} for converting a {@link PCollection 
PCollectionT} to a
- * {@link PCollection PCollectionString}.
- *
- * Example of use:
- *  {@code
- * PCollection longs = ...;
- * PCollection strings = longs.apply(ToString.element());
- * } 
- *
+ * {@link PTransform PTransforms} for converting a {@link PCollection 
PCollection?},
+ * {@link PCollection PCollectionKV?,?}, or
+ * {@link PCollection PCollectionIterable?}
+ * to a {@link PCollection PCollectionString}.
  *
  * Note: For any custom string conversion and formatting, we 
recommend applying your own
  * {@link SerializableFunction} using {@link 
MapElements#via(SerializableFunction)}
  */
 public final class ToString {
+  private ToString() {
+// do not instantiate
+  }
 
   /**
* Returns a {@code PTransform} which 
transforms each
* element of the input {@link PCollection} to a {@link String} using the
* {@link Object#toString} method.
*/
-  public static PTransform element() {
-return new Default();
+  public static PTransform of() {
+return new SimpleToString();
   }
 
-  private ToString() {
+  /**
+   * Returns a {@code PTransform} 
which transforms each
+   * element of the input {@link PCollection} to a {@link String} by using the
+   * {@link Object#toString} on the key followed by a "," followed by the 
{@link Object#toString}
+   * of the value.
+   */
+  public static PTransform, 
PCollection> kv() {
+return kv(",");
+  }
+
+  /**
+   * Returns a {@code PTransform} 
which transforms each
+   * element of the input {@link PCollection} to a {@link String} by using the
+   * {@link Object#toString} on the key followed by the specified delimeter 
followed by the
+   * {@link Object#toString} of the value.
+   * @param delimiter The delimiter to put between the key and value
+   */
+  public static PTransform,
+  PCollection> kv(String delimiter) {
+return new KVToString(delimiter);
+  }
+
+  /**
+   * Returns a {@code PTransform} which
+   * transforms each item in the iterable of the input {@link PCollection} to 
a {@link String}
+   * using the {@link Object#toString} method followed by a "," until
+   * the last element in the iterable. There is no trailing delimiter.
+   */
+  public static PTransform, 
PCollection> iterable() {
+return iterable(",");
+  }
+
+  /**
+   * Returns a {@code PTransform} which
+   * transforms each item in the iterable of the input {@link PCollection} to 
a {@link String}
+   * using the {@link Object#toString} method followed by the specified 
delimiter until
+   * the last element in the iterable. There is no trailing delimiter.
+   * @param delimiter The delimiter to put between the items in the iterable.
+   */
+  public static PTransform,
+  PCollection> iterable(String delimiter) {
+return new IterablesToString(delimiter);
   }
 
   /**
* A {@link PTransform} that converts a {@code PCollection} to a {@code 
PCollection}
* using the {@link  

[07/50] beam git commit: This closes #1833: Removes ReduceFnExecutor interface

2017-01-29 Thread dhalperi
This closes #1833: Removes ReduceFnExecutor interface


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11c3cd70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11c3cd70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11c3cd70

Branch: refs/heads/python-sdk
Commit: 11c3cd70b784650e8b60a5660449cfafdba84bbf
Parents: b333487 8989473
Author: Kenneth Knowles 
Authored: Tue Jan 24 13:48:23 2017 -0800
Committer: Kenneth Knowles 
Committed: Tue Jan 24 13:48:23 2017 -0800

--
 .../apache/beam/runners/core/DoFnRunner.java| 20 
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  5 +
 .../beam/runners/direct/ParDoEvaluator.java |  2 --
 .../runners/spark/translation/DoFnFunction.java |  2 --
 .../spark/translation/MultiDoFnFunction.java|  2 --
 5 files changed, 1 insertion(+), 30 deletions(-)
--




[45/50] beam git commit: Update Beam version in the Maven archetypes

2017-01-29 Thread dhalperi
Update Beam version in the Maven archetypes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c118156
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c118156
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c118156

Branch: refs/heads/python-sdk
Commit: 9c1181563d89e604b899e5e945d5975359f42543
Parents: 4a29131
Author: Jean-Baptiste Onofré 
Authored: Fri Jan 27 18:34:24 2017 +0100
Committer: Jean-Baptiste Onofré 
Committed: Fri Jan 27 18:34:24 2017 +0100

--
 .../examples-java8/src/main/resources/archetype-resources/pom.xml  | 2 +-
 .../examples/src/main/resources/archetype-resources/pom.xml| 2 +-
 .../starter/src/main/resources/archetype-resources/pom.xml | 2 +-
 .../starter/src/test/resources/projects/basic/reference/pom.xml| 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/9c118156/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
 
b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
index 05cb797..55211ed 100644
--- 
a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
+++ 
b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
@@ -27,7 +27,7 @@
   jar
 
   
-0.5.0-SNAPSHOT
+0.6.0-SNAPSHOT
   
 
   

http://git-wip-us.apache.org/repos/asf/beam/blob/9c118156/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
 
b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index 74f08bf..654973c 100644
--- 
a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ 
b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -27,7 +27,7 @@
   jar
 
   
-0.5.0-SNAPSHOT
+0.6.0-SNAPSHOT
   
 
   

http://git-wip-us.apache.org/repos/asf/beam/blob/9c118156/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
 
b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
index e7f1185..5d2a408 100644
--- 
a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
+++ 
b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
@@ -25,7 +25,7 @@
   ${version}
 
   
-0.5.0-SNAPSHOT
+0.6.0-SNAPSHOT
   
 
   

http://git-wip-us.apache.org/repos/asf/beam/blob/9c118156/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index 871d194..1c666eb 100644
--- 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -25,7 +25,7 @@
   0.1
 
   
-0.5.0-SNAPSHOT
+0.6.0-SNAPSHOT
   
 
   



[22/50] beam git commit: This closes #1184

2017-01-29 Thread dhalperi
This closes #1184


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5257837
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5257837
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5257837

Branch: refs/heads/python-sdk
Commit: c525783704e0cc47845df8cdec1715e1f1c74008
Parents: 979c937 3ecf7e7
Author: Dan Halperin 
Authored: Wed Jan 25 11:03:05 2017 -0800
Committer: Dan Halperin 
Committed: Wed Jan 25 11:03:05 2017 -0800

--
 runners/google-cloud-dataflow-java/pom.xml  |   5 +
 .../beam/runners/dataflow/util/GcsStager.java   |  18 +-
 .../beam/runners/dataflow/util/PackageUtil.java | 349 ---
 .../runners/dataflow/util/PackageUtilTest.java  |  42 ++-
 .../org/apache/beam/sdk/options/GcsOptions.java |   4 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  12 +
 6 files changed, 281 insertions(+), 149 deletions(-)
--




[33/50] beam git commit: This closes #1853

2017-01-29 Thread dhalperi
This closes #1853


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/717b415f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/717b415f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/717b415f

Branch: refs/heads/python-sdk
Commit: 717b415f1a6024f1630d922cbd357c894452af40
Parents: b4726d0 e591d8b
Author: Dan Halperin 
Authored: Thu Jan 26 09:38:05 2017 -0800
Committer: Dan Halperin 
Committed: Thu Jan 26 09:38:05 2017 -0800

--
 runners/google-cloud-dataflow-java/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[50/50] beam git commit: Closes #1861

2017-01-29 Thread dhalperi
Closes #1861


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27cf68ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27cf68ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27cf68ee

Branch: refs/heads/python-sdk
Commit: 27cf68ee72bd58475c170712f7afe20102601606
Parents: 1bc6859 f1b8679
Author: Dan Halperin 
Authored: Sun Jan 29 08:21:18 2017 -0800
Committer: Dan Halperin 
Committed: Sun Jan 29 08:21:18 2017 -0800

--
 .jenkins/common_job_properties.groovy   |9 +-
 ...job_beam_PostCommit_Java_MavenInstall.groovy |2 +-
 .../job_beam_PreCommit_Java_MavenInstall.groovy |2 +-
 .../job_beam_Release_NightlySnapshot.groovy |2 +-
 .jenkins/job_seed.groovy|2 +-
 .travis/README.md   |2 +-
 DISCLAIMER  |   10 -
 NOTICE  |4 +-
 README.md   |   46 +-
 examples/java/README.md |   16 +-
 examples/java/pom.xml   |   21 +-
 .../beam/examples/DebuggingWordCount.java   |4 +-
 .../org/apache/beam/examples/WordCount.java |6 +-
 .../beam/examples/complete/AutoComplete.java|2 +-
 .../org/apache/beam/examples/complete/README.md |   14 +-
 .../apache/beam/examples/complete/TfIdf.java|2 +-
 .../examples/complete/TopWikipediaSessions.java |2 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |2 +-
 .../beam/examples/complete/TrafficRoutes.java   |2 +-
 .../examples/cookbook/BigQueryTornadoes.java|2 +-
 .../cookbook/CombinePerKeyExamples.java |2 +-
 .../org/apache/beam/examples/cookbook/README.md |   14 +-
 .../beam/examples/cookbook/TriggerExample.java  |4 +-
 .../beam/examples/WindowedWordCountIT.java  |   16 +-
 examples/java8/pom.xml  |2 +-
 .../beam/examples/complete/game/GameStats.java  |7 +-
 .../examples/complete/game/LeaderBoard.java |5 +-
 .../beam/examples/complete/game/UserScore.java  |2 +-
 examples/pom.xml|   16 +-
 pom.xml |   41 +-
 runners/apex/README.md  |4 +-
 runners/apex/pom.xml|3 +-
 .../beam/runners/apex/ApexPipelineOptions.java  |7 +-
 .../apache/beam/runners/apex/ApexRunner.java|   43 +-
 .../beam/runners/apex/ApexYarnLauncher.java |   23 +-
 .../translation/CreateValuesTranslator.java |   18 +-
 .../FlattenPCollectionTranslator.java   |   28 +-
 .../apex/translation/GroupByKeyTranslator.java  |2 +-
 .../translation/ParDoBoundMultiTranslator.java  |   27 +-
 .../apex/translation/ParDoBoundTranslator.java  |4 +-
 .../apex/translation/TranslationContext.java|   27 +-
 .../apex/translation/WindowBoundTranslator.java |8 +-
 .../operators/ApexGroupByKeyOperator.java   |4 +-
 .../operators/ApexParDoOperator.java|6 +-
 .../ApexReadUnboundedInputOperator.java |   17 +-
 .../beam/runners/apex/ApexRunnerTest.java   |   75 ++
 .../beam/runners/apex/ApexYarnLauncherTest.java |9 +-
 .../runners/apex/examples/WordCountTest.java|2 +-
 .../translation/ParDoBoundTranslatorTest.java   |6 +-
 .../translation/ReadUnboundTranslatorTest.java  |8 +-
 .../utils/ApexStateInternalsTest.java   |2 +-
 .../test/resources/beam-runners-apex.properties |   20 +
 runners/core-java/pom.xml   |2 +-
 .../beam/runners/core/AssignWindowsDoFn.java|3 +-
 .../apache/beam/runners/core/DoFnAdapters.java  |  343 ++
 .../apache/beam/runners/core/DoFnRunner.java|   21 -
 .../apache/beam/runners/core/DoFnRunners.java   |  138 +--
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   10 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java|5 +-
 .../beam/runners/core/KeyedWorkItemCoder.java   |4 +-
 .../core/LateDataDroppingDoFnRunner.java|1 -
 .../apache/beam/runners/core/NonEmptyPanes.java |2 +-
 .../org/apache/beam/runners/core/OldDoFn.java   |  472 
 .../runners/core/PerKeyCombineFnRunner.java |   70 --
 .../runners/core/PerKeyCombineFnRunners.java|  101 --
 .../beam/runners/core/SimpleDoFnRunner.java |   63 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |7 +-
 .../beam/runners/core/SplittableParDo.java  |7 -
 .../core/UnboundedReadFromBoundedSource.java|   14 +-
 .../AfterDelayFromFirstElementStateMachine.java |2 +-
 .../core/triggers/AfterPaneStateMachine.java|2 +-
 .../core/DoFnDelegatingAggregatorTest.java  |  144 +++
 .../core/GroupAlsoByWindowsProperties.java  |2 +-
 .../runners/core/KeyedWorkItemCoderTest.java|6 +
 

[46/50] beam git commit: This closes #1859

2017-01-29 Thread dhalperi
This closes #1859


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b21bdf47
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b21bdf47
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b21bdf47

Branch: refs/heads/python-sdk
Commit: b21bdf4755363191209e05f96ca8044731a346ed
Parents: 4a29131 9c11815
Author: Jean-Baptiste Onofré 
Authored: Fri Jan 27 21:18:45 2017 +0100
Committer: Jean-Baptiste Onofré 
Committed: Fri Jan 27 21:18:45 2017 +0100

--
 .../examples-java8/src/main/resources/archetype-resources/pom.xml  | 2 +-
 .../examples/src/main/resources/archetype-resources/pom.xml| 2 +-
 .../starter/src/main/resources/archetype-resources/pom.xml | 2 +-
 .../starter/src/test/resources/projects/basic/reference/pom.xml| 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
--




[23/50] beam git commit: [BEAM-246] re-enable Checkstyle by default

2017-01-29 Thread dhalperi
[BEAM-246] re-enable Checkstyle by default

This adds 50%+ overhead to a clean build (with testing disabled), but
per dev@ discussion is a huge usability win for contributors and
committers alike.

https://lists.apache.org/thread.html/CAA8k_FKafuon8GEA3CXwR2MZh2kAXEFZQK=bgx5tk2fzjeb...@mail.gmail.com


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f05c5d32
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f05c5d32
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f05c5d32

Branch: refs/heads/python-sdk
Commit: f05c5d32cb5dbee6de4247a803d7b7c7fbe52173
Parents: c525783
Author: Dan Halperin 
Authored: Tue Jan 24 13:52:06 2017 -0800
Committer: Dan Halperin 
Committed: Wed Jan 25 12:13:33 2017 -0800

--
 examples/pom.xml | 14 +-
 runners/pom.xml  | 14 +-
 sdks/pom.xml | 13 +++--
 3 files changed, 25 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/f05c5d32/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index adfbaa9..4294c2d 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -51,11 +51,6 @@
   
 
   
-org.apache.maven.plugins
-maven-checkstyle-plugin
-  
-
-  
 org.codehaus.mojo
 findbugs-maven-plugin
   
@@ -64,4 +59,13 @@
 
   
 
+  
+
+  
+org.apache.maven.plugins
+maven-checkstyle-plugin
+  
+
+  
+
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f05c5d32/runners/pom.xml
--
diff --git a/runners/pom.xml b/runners/pom.xml
index fb84164..ceaedfe 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -47,11 +47,6 @@
   
 
   
-org.apache.maven.plugins
-maven-checkstyle-plugin
-  
-
-  
 org.codehaus.mojo
 findbugs-maven-plugin
   
@@ -99,4 +94,13 @@
   
 
   
+
+  
+
+  
+org.apache.maven.plugins
+maven-checkstyle-plugin
+  
+
+  
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f05c5d32/sdks/pom.xml
--
diff --git a/sdks/pom.xml b/sdks/pom.xml
index bfdfcd9..2682728 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -42,11 +42,6 @@
   
 
   
-org.apache.maven.plugins
-maven-checkstyle-plugin
-  
-
-  
 org.codehaus.mojo
 findbugs-maven-plugin
   
@@ -58,7 +53,6 @@
   
 
   
-
 
 
   org.apache.maven.plugins
@@ -74,6 +68,13 @@
 
   
 
+
+
+  
+org.apache.maven.plugins
+maven-checkstyle-plugin
+  
+
   
 
 



[29/50] beam git commit: Recommit "DataflowRunner: parallelize staging of files"

2017-01-29 Thread dhalperi
Recommit "DataflowRunner: parallelize staging of files"

Revert "This closes #1847"

This reverts commit 1c6e667414788fe99f583fac39d458a4984ae162, reversing
changes made to 6413299a20be57de849684479134479fa1acee2d.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23e2b913
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23e2b913
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23e2b913

Branch: refs/heads/python-sdk
Commit: 23e2b913946acb2690fbac2d751a5672d80121aa
Parents: 1c6e667
Author: Dan Halperin 
Authored: Wed Jan 25 21:04:20 2017 -0800
Committer: Dan Halperin 
Committed: Wed Jan 25 21:04:27 2017 -0800

--
 runners/google-cloud-dataflow-java/pom.xml  |   5 +
 .../beam/runners/dataflow/util/GcsStager.java   |  18 +-
 .../beam/runners/dataflow/util/PackageUtil.java | 349 ---
 .../runners/dataflow/util/PackageUtilTest.java  |  42 ++-
 .../org/apache/beam/sdk/options/GcsOptions.java |   4 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  12 +
 6 files changed, 281 insertions(+), 149 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/runners/google-cloud-dataflow-java/pom.xml
--
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index eea5502..9858b3d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -203,6 +203,11 @@
 
 
 
+  com.google.apis
+  google-api-services-storage
+
+
+
   com.google.auth
   google-auth-library-credentials
 

http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 6ca4c3f..53822e3 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -17,13 +17,19 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.storage.Storage;
 import java.util.List;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
+import org.apache.beam.sdk.util.Transport;
 
 /**
  * Utility class for staging files to GCS.
@@ -35,6 +41,7 @@ public class GcsStager implements Stager {
 this.options = options;
   }
 
+  @SuppressWarnings("unused")  // used via reflection
   public static GcsStager fromOptions(PipelineOptions options) {
 return new GcsStager(options.as(DataflowPipelineOptions.class));
   }
@@ -48,7 +55,16 @@ public class GcsStager implements Stager {
 if (windmillBinary != null) {
   filesToStage.add("windmill_main=" + windmillBinary);
 }
+int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 
1024 * 1024);
+checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
+uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
+Storage.Builder storageBuilder = Transport.newStorageClient(options);
+GcsUtil util = GcsUtilFactory.create(
+storageBuilder.build(),
+storageBuilder.getHttpRequestInitializer(),
+options.getExecutorService(),
+uploadSizeBytes);
 return PackageUtil.stageClasspathElements(
-options.getFilesToStage(), options.getStagingLocation());
+options.getFilesToStage(), options.getStagingLocation(), util);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/23e2b913/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
 

[42/50] beam git commit: BEAM-980 Support configuration of Apex DAG through properties file.

2017-01-29 Thread dhalperi
BEAM-980 Support configuration of Apex DAG through properties file.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31c63cb8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31c63cb8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31c63cb8

Branch: refs/heads/python-sdk
Commit: 31c63cb8c14ea71ed45376d19b4fd9f285d80763
Parents: 1c6e667
Author: Thomas Weise 
Authored: Wed Jan 25 22:22:36 2017 -0800
Committer: Thomas Weise 
Committed: Thu Jan 26 22:54:00 2017 -0800

--
 .../beam/runners/apex/ApexPipelineOptions.java  |  7 +-
 .../apache/beam/runners/apex/ApexRunner.java| 43 ---
 .../beam/runners/apex/ApexYarnLauncher.java | 23 +-
 .../beam/runners/apex/ApexRunnerTest.java   | 75 
 .../beam/runners/apex/ApexYarnLauncherTest.java |  9 ++-
 .../test/resources/beam-runners-apex.properties | 20 ++
 6 files changed, 161 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index 54fdf76..f37e874 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -56,5 +56,10 @@ public interface ApexPipelineOptions extends 
PipelineOptions, java.io.Serializab
   @Default.Long(0)
   long getRunMillis();
 
-}
+  @Description("configuration properties file for the Apex engine")
+  void setConfigFile(String name);
+
+  @Default.String("classpath:/beam-runners-apex.properties")
+  String getConfigFile();
 
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f12ebef..e220e6c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,10 +22,16 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
+
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.apex.api.Launcher;
@@ -64,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
 public class ApexRunner extends PipelineRunner {
 
   private final ApexPipelineOptions options;
+  public static final String CLASSPATH_SCHEME = "classpath";
 
   /**
* TODO: this isn't thread safe and may cause issues when tests run in 
parallel
@@ -126,6 +133,31 @@ public class ApexRunner extends 
PipelineRunner {
   }
 };
 
+Properties configProperties = new Properties();
+try {
+  if (options.getConfigFile() != null) {
+URI configURL = new URI(options.getConfigFile());
+if (CLASSPATH_SCHEME.equals(configURL.getScheme())) {
+  InputStream is = 
this.getClass().getResourceAsStream(configURL.getPath());
+  if (is != null) {
+configProperties.load(is);
+is.close();
+  }
+} else {
+  if (!configURL.isAbsolute()) {
+// resolve as local file name
+File f = new File(options.getConfigFile());
+configURL = f.toURI();
+  }
+  try (InputStream is = configURL.toURL().openStream()) {
+configProperties.load(is);
+  }
+}
+  }
+} catch (IOException | URISyntaxException ex) {
+  throw new RuntimeException("Error loading properties", ex);
+}
+
 if (options.isEmbeddedExecution()) {
   Launcher launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
   Attribute.AttributeMap launchAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
@@ -135,6 +167,7 @@ public class ApexRunner extends 
PipelineRunner {
 launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
   }
   Configuration conf = new Configuration(false);
+  

[37/50] beam git commit: This closes #1856

2017-01-29 Thread dhalperi
This closes #1856


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2cbc08b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2cbc08b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2cbc08b5

Branch: refs/heads/python-sdk
Commit: 2cbc08b5870036c52a94bb1f1f1d081d387e4ae0
Parents: 9637724 4d0225e
Author: Dan Halperin 
Authored: Thu Jan 26 14:42:04 2017 -0800
Committer: Dan Halperin 
Committed: Thu Jan 26 14:42:04 2017 -0800

--
 .../apache/beam/examples/WindowedWordCountIT.java   | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--




[26/50] beam git commit: [BEAM-708] Using AutoValue in BoundedReadFromUnboundedSource

2017-01-29 Thread dhalperi
[BEAM-708] Using AutoValue in BoundedReadFromUnboundedSource

This closes #1794


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6413299a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6413299a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6413299a

Branch: refs/heads/python-sdk
Commit: 6413299a20be57de849684479134479fa1acee2d
Parents: 95beda6 a67ff91
Author: Luke Cwik 
Authored: Wed Jan 25 14:22:56 2017 -0800
Committer: Luke Cwik 
Committed: Wed Jan 25 14:22:56 2017 -0800

--
 .../sdk/io/BoundedReadFromUnboundedSource.java  | 69 +---
 1 file changed, 44 insertions(+), 25 deletions(-)
--




[04/50] beam git commit: [BEAM-1258] Improve logging in BigQueryIO.verifyTableEmpty().

2017-01-29 Thread dhalperi
[BEAM-1258] Improve logging in BigQueryIO.verifyTableEmpty().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b6dd91d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b6dd91d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b6dd91d

Branch: refs/heads/python-sdk
Commit: 5b6dd91d27ce73fa66db4d445b0ceb88f09971d8
Parents: cb6e0a8
Author: Pei He 
Authored: Mon Jan 23 14:52:30 2017 -0800
Committer: Dan Halperin 
Committed: Tue Jan 24 12:25:22 2017 -0800

--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 32 +++-
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  2 ++
 2 files changed, 19 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5b6dd91d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 701374d..aff199a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1863,25 +1863,27 @@ public class BigQueryIO {
 writeDisposition, validate, testServices);
   }
 
-  private static void verifyTableEmpty(
+  private static void verifyTableNotExistOrEmpty(
   DatasetService datasetService,
-  TableReference table) {
+  TableReference tableRef) {
 try {
-  boolean isEmpty = datasetService.isTableEmpty(
-  table.getProjectId(), table.getDatasetId(), table.getTableId());
-  if (!isEmpty) {
-throw new IllegalArgumentException(
-"BigQuery table is not empty: " + 
BigQueryIO.toTableSpec(table));
+  if (datasetService.getTable(
+  tableRef.getProjectId(),
+  tableRef.getDatasetId(),
+  tableRef.getTableId()) != null) {
+checkState(
+datasetService.isTableEmpty(
+tableRef.getProjectId(), tableRef.getDatasetId(), 
tableRef.getTableId()),
+"BigQuery table is not empty: %s.",
+BigQueryIO.toTableSpec(tableRef));
   }
 } catch (IOException | InterruptedException e) {
-  ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-  if (e instanceof IOException && 
errorExtractor.itemNotFound((IOException) e)) {
-// Nothing to do. If the table does not exist, it is considered 
empty.
-  } else {
-throw new RuntimeException(
-"unable to confirm BigQuery table emptiness for table "
-+ BigQueryIO.toTableSpec(table), e);
+  if (e instanceof InterruptedException) {
+Thread.currentThread().interrupt();
   }
+  throw new RuntimeException(
+  "unable to confirm BigQuery table emptiness for table "
+  + BigQueryIO.toTableSpec(tableRef), e);
 }
   }
 
@@ -1917,7 +1919,7 @@ public class BigQueryIO {
 verifyTablePresence(datasetService, table);
   }
   if (getWriteDisposition() == 
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-verifyTableEmpty(datasetService, table);
+verifyTableNotExistOrEmpty(datasetService, table);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5b6dd91d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 7173996..32cf46d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -136,6 +136,8 @@ interface BigQueryServices extends Serializable {
 
 /**
  * Returns true if the table is empty.
+ *
+ * @throws IOException if the table is not found.
  */
 boolean isTableEmpty(String projectId, String datasetId, String tableId)
 throws IOException, InterruptedException;



[09/50] beam git commit: This closes #1590

2017-01-29 Thread dhalperi
This closes #1590


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f2389ab7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f2389ab7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f2389ab7

Branch: refs/heads/python-sdk
Commit: f2389ab7ba1d562d23420d7e2ecd638524439dc6
Parents: 11c3cd7 dc36952
Author: Dan Halperin 
Authored: Tue Jan 24 14:41:55 2017 -0800
Committer: Dan Halperin 
Committed: Tue Jan 24 14:41:55 2017 -0800

--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 51 ++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++
 2 files changed, 71 insertions(+), 16 deletions(-)
--




[jira] (BEAM-1344) Uniform metrics step name semantics

2017-01-29 Thread Aviem Zur (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aviem Zur created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1344 
 
 
 
  Uniform metrics step name semantics  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Improvement 
 
 
 

Assignee:
 
 Davor Bonaci 
 
 
 

Components:
 

 sdk-java-core 
 
 
 

Created:
 

 29/Jan/17 15:14 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Aviem Zur 
 
 
 
 
 
 
 
 
 
 
Agree on and implement uniform metrics step name semantics which runners would adhere to. 
Current discussion seems to point at a string with the pipeline graph path to the step's transform. Something along the lines of: "PBegin/SomeInputTransform/SomeParDo/...MyTransform.#Running_number_for_collisions". 
Also agree on and implement metrics querying semantics. Current discussion seems to point at a substring or regex matching of steps on given string input. 
Original dev list discussion 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment