[1/2] beam git commit: Add README.md to KafkaIO

2017-08-29 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master e33cc24a5 -> f816ad879


Add README.md to KafkaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf1b0a5e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf1b0a5e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf1b0a5e

Branch: refs/heads/master
Commit: bf1b0a5e9e95abf0521e081ba575c789f46ba499
Parents: e33cc24
Author: Joey Baruch 
Authored: Tue Aug 29 16:38:24 2017 +0300
Committer: Aviem Zur 
Committed: Tue Aug 29 18:33:57 2017 +0300

--
 sdks/java/io/kafka/README.md | 36 
 1 file changed, 36 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/bf1b0a5e/sdks/java/io/kafka/README.md
--
diff --git a/sdks/java/io/kafka/README.md b/sdks/java/io/kafka/README.md
new file mode 100644
index 000..07d00a1
--- /dev/null
+++ b/sdks/java/io/kafka/README.md
@@ -0,0 +1,36 @@
+
+
+KafkaIO contains I/O transforms which allow you to read/write messages from/to 
[Apache Kafka](http://kafka.apache.org/).
+
+## Dependencies
+
+To use KafkaIO you must first add a dependency on `beam-sdks-java-io-kafka`
+
+```maven
+
+org.apache.beam
+beam-sdks-java-io-kafka
+...
+
+```
+
+## Documentation
+
+- 
[KafkaIO.java](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java)



[2/2] beam git commit: This closes #3780

2017-08-29 Thread aviemzur
This closes #3780


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f816ad87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f816ad87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f816ad87

Branch: refs/heads/master
Commit: f816ad87923610bd3fe507a298ac53ea49b4174a
Parents: e33cc24 bf1b0a5
Author: Aviem Zur 
Authored: Tue Aug 29 18:34:33 2017 +0300
Committer: Aviem Zur 
Committed: Tue Aug 29 18:34:33 2017 +0300

--
 sdks/java/io/kafka/README.md | 36 
 1 file changed, 36 insertions(+)
--




[beam-site] branch asf-site updated (1b92d41 -> 3c2d094)

2017-08-13 Thread aviemzur
This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


from 1b92d41  This closes #279: Expands the section on Coders in Style Guide
 new 97b1ec9  Update PMC
 new 6835c8a  Regenerate website
 new 3c2d094  This closes #287

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/contribute/team/index.html | 2 +-
 src/_beam_team/team.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam-site] 02/03: Regenerate website

2017-08-13 Thread aviemzur
This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 6835c8a4f015f0cebaf55a09730e9f3938273f0f
Author: Aviem Zur 
AuthorDate: Sun Aug 13 15:32:38 2017 +0300

Regenerate website
---
 content/contribute/team/index.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/content/contribute/team/index.html 
b/content/contribute/team/index.html
index 76ec22e..ea48ea3 100644
--- a/content/contribute/team/index.html
+++ b/content/contribute/team/index.html
@@ -379,7 +379,7 @@
   aviemzur
   aviemzur [at] apache [dot] org
   PayPal
-  committer
+  committer, PMC
   +2
 
   

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 03/03: This closes #287

2017-08-13 Thread aviemzur
This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 3c2d09447750cee276a1e1f197b53b7ecb40af8c
Merge: 1b92d41 6835c8a
Author: Aviem Zur 
AuthorDate: Sun Aug 13 15:32:46 2017 +0300

This closes #287

 content/contribute/team/index.html | 2 +-
 src/_beam_team/team.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 01/03: Update PMC

2017-08-13 Thread aviemzur
This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 97b1ec9cbdbb5bd71cde3ec9170f5dd9236e4f78
Author: Aviem Zur 
AuthorDate: Fri Aug 11 00:51:00 2017 +0300

Update PMC
---
 src/_beam_team/team.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/_beam_team/team.md b/src/_beam_team/team.md
index fd3feb2..8083d22 100644
--- a/src/_beam_team/team.md
+++ b/src/_beam_team/team.md
@@ -150,6 +150,6 @@ members:
 apache_id: aviemzur
 email: aviemzur [at] apache [dot] org
 organization: PayPal
-roles: committer
+roles: committer, PMC
 time_zone: "+2"
 ---

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[1/2] beam git commit: [BEAM-2314] Add ValidatesRunner test for merging custom windows

2017-07-24 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master f54072a1b -> 0064fb37a


[BEAM-2314] Add ValidatesRunner test for merging custom windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dfa983ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dfa983ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dfa983ce

Branch: refs/heads/master
Commit: dfa983ce4adb85d211497460254b6a95944ce869
Parents: f54072a
Author: Etienne Chauchot 
Authored: Mon May 29 12:05:51 2017 +0200
Committer: Aviem Zur 
Committed: Mon Jul 24 14:33:00 2017 +0300

--
 runners/spark/pom.xml   |   3 +-
 .../sdk/testing/UsesCustomWindowMerging.java|  23 +++
 .../sdk/transforms/windowing/WindowTest.java| 184 +++
 3 files changed, 209 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 7f70204..35e933b 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -77,7 +77,8 @@
   
 org.apache.beam.sdk.testing.UsesSplittableParDo,
 org.apache.beam.sdk.testing.UsesCommittedMetrics,
-org.apache.beam.sdk.testing.UsesTestStream
+org.apache.beam.sdk.testing.UsesTestStream,
+org.apache.beam.sdk.testing.UsesCustomWindowMerging
   
   none
   1

http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
new file mode 100644
index 000..fc40e02
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize custom window merging.
+ */
+public interface UsesCustomWindowMerging {}

http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 65af7a1..5b6d046 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -31,19 +31,30 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesCustomWindowMerging;
 import org.apache.bea

[2/2] beam git commit: This closes #3286

2017-07-24 Thread aviemzur
This closes #3286


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0064fb37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0064fb37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0064fb37

Branch: refs/heads/master
Commit: 0064fb37ad13a10fc510e567d21873403a42340a
Parents: f54072a dfa983c
Author: Aviem Zur 
Authored: Mon Jul 24 18:22:42 2017 +0300
Committer: Aviem Zur 
Committed: Mon Jul 24 18:22:42 2017 +0300

--
 runners/spark/pom.xml   |   3 +-
 .../sdk/testing/UsesCustomWindowMerging.java|  23 +++
 .../sdk/transforms/windowing/WindowTest.java| 184 +++
 3 files changed, 209 insertions(+), 1 deletion(-)
--




[3/3] beam git commit: This closes #3343

2017-06-22 Thread aviemzur
This closes #3343


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2ee5955
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2ee5955
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2ee5955

Branch: refs/heads/master
Commit: e2ee59557befd729e24e62a4991a76bab64f5755
Parents: b3099bb 22dbb50
Author: Aviem Zur 
Authored: Thu Jun 22 15:33:26 2017 +0300
Committer: Aviem Zur 
Committed: Thu Jun 22 15:33:26 2017 +0300

--
 runners/spark/pom.xml   |  42 +++---
 .../apache/beam/runners/spark/SparkRunner.java  |   2 +-
 .../beam/runners/spark/TestSparkRunner.java |   2 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java |   6 +-
 .../spark/stateful/SparkTimerInternals.java |  18 ++-
 .../spark/util/GlobalWatermarkHolder.java   | 127 ++-
 .../spark/GlobalWatermarkHolderTest.java|  18 +--
 7 files changed, 141 insertions(+), 74 deletions(-)
--




[2/3] beam git commit: [BEAM-2359] Fix watermark broadcasting to executors in Spark runner

2017-06-22 Thread aviemzur
[BEAM-2359] Fix watermark broadcasting to executors in Spark runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20820fa5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20820fa5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20820fa5

Branch: refs/heads/master
Commit: 20820fa5477ffcdd4a9ef2e9340353ed3c5691a9
Parents: b3099bb
Author: Aviem Zur 
Authored: Mon Jun 12 17:04:00 2017 +0300
Committer: Aviem Zur 
Committed: Thu Jun 22 14:51:02 2017 +0300

--
 .../apache/beam/runners/spark/SparkRunner.java  |   2 +-
 .../beam/runners/spark/TestSparkRunner.java |   2 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java |   6 +-
 .../spark/stateful/SparkTimerInternals.java |  18 ++-
 .../spark/util/GlobalWatermarkHolder.java   | 127 ++-
 .../spark/GlobalWatermarkHolderTest.java|  18 +--
 6 files changed, 120 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/20820fa5/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index d008718..595521f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -171,7 +171,7 @@ public final class SparkRunner extends 
PipelineRunner {
   }
 
   // register Watermarks listener to broadcast the advanced WMs.
-  jssc.addStreamingListener(new JavaStreamingListenerWrapper(new 
WatermarksListener(jssc)));
+  jssc.addStreamingListener(new JavaStreamingListenerWrapper(new 
WatermarksListener()));
 
   // The reason we call initAccumulators here even though it is called in
   // SparkRunnerStreamingContextFactory is because the factory is not 
called when resuming

http://git-wip-us.apache.org/repos/asf/beam/blob/20820fa5/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index eccee57..a13a3b1 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -169,7 +169,7 @@ public final class TestSparkRunner extends 
PipelineRunner {
 result.waitUntilFinish(Duration.millis(batchDurationMillis));
 do {
   SparkTimerInternals sparkTimerInternals =
-  SparkTimerInternals.global(GlobalWatermarkHolder.get());
+  
SparkTimerInternals.global(GlobalWatermarkHolder.get(batchDurationMillis));
   sparkTimerInternals.advanceWatermark();
   globalWatermark = sparkTimerInternals.currentInputWatermarkTime();
   // let another batch-interval period of execution, just to reason about 
WM propagation.

http://git-wip-us.apache.org/repos/asf/beam/blob/20820fa5/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index be4f3f6..1385e07 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -104,13 +104,15 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 
   public static 
   JavaDStream>>> groupAlsoByWindow(
-  JavaDStream 
inputDStream,
+  final JavaDStream inputDStream,
   final Coder keyCoder,
   final Coder> wvCoder,
   final WindowingStrategy windowingStrategy,
   final SparkRuntimeContext runtimeContext,
   final List sourceIds) {
 
+final long batchDurationMillis =
+
runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis();
 final IterableCoder> itrWvCoder = 
IterableCoder.of(wvCoder);
 final Coder iCoder = ((FullWindowedValueCoder) 
wvCoder).getValueCoder();
 final Coder wCoder =
@@ -239,7 +241,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 
   SparkStateInternals stateInternals;
   SparkTi

[1/3] beam git commit: Move Spark runner streaming tests to post commit.

2017-06-22 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master b3099bba2 -> e2ee59557


Move Spark runner streaming tests to post commit.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22dbb500
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22dbb500
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22dbb500

Branch: refs/heads/master
Commit: 22dbb500289675fe95b6d149c8550e09dc26feac
Parents: 20820fa
Author: Aviem Zur 
Authored: Wed Jun 21 17:53:21 2017 +0300
Committer: Aviem Zur 
Committed: Thu Jun 22 14:51:02 2017 +0300

--
 runners/spark/pom.xml | 42 +-
 1 file changed, 21 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/22dbb500/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 0f6b730..ee72dd9 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -103,6 +103,27 @@
   4
 
   
+  
+streaming-tests
+test
+
+  test
+
+
+  
+org.apache.beam.runners.spark.StreamingTest
+  
+  
+
+  [
+  "--runner=TestSparkRunner",
+  "--forceStreaming=true",
+  "--enableSparkMetricSinks=true"
+  ]
+
+  
+
+  
 
   
 
@@ -372,27 +393,6 @@
 
   
 
-
-  streaming-tests
-  test
-  
-test
-  
-  
-
-  org.apache.beam.runners.spark.StreamingTest
-
-
-  
-[
-"--runner=TestSparkRunner",
-"--forceStreaming=true",
-"--enableSparkMetricSinks=true"
-]
-  
-
-  
-
   
 
 



[GitHub] beam pull request #3343: [BEAM-2359] Fix watermark broadcasting to executors...

2017-06-12 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/3343

[BEAM-2359] Fix watermark broadcasting to executors in Spark runner

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---
R: @amitsela 
CC: @staslev @kobisalant 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam BEAM-2359-watermark-bug-spark

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3343


commit f1b679e402734f20dcd9645babaec0a3f291e259
Author: Aviem Zur 
Date:   2017-06-12T14:04:00Z

[BEAM-2359] Fix watermark broadcasting to executors in Spark runner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #2917

2017-06-03 Thread aviemzur
This closes #2917


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43c44232
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43c44232
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43c44232

Branch: refs/heads/master
Commit: 43c44232dcb23dc8ff38a05197962dbe1a3a5e64
Parents: 9cdae6c 5e5fbed
Author: Aviem Zur 
Authored: Sat Jun 3 17:42:48 2017 +0300
Committer: Aviem Zur 
Committed: Sat Jun 3 17:42:48 2017 +0300

--
 runners/spark/pom.xml   |   2 -
 .../spark/translation/MultiDoFnFunction.java| 104 +--
 .../spark/translation/SparkProcessContext.java  |  23 +++-
 .../spark/translation/TransformTranslator.java  |  84 ---
 .../streaming/StreamingTransformTranslator.java |   3 +-
 5 files changed, 189 insertions(+), 27 deletions(-)
--




[1/2] beam git commit: [BEAM-2175] [BEAM-1115] Support for new State and Timer API in Spark batch mode

2017-06-03 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 9cdae6caf -> 43c44232d


[BEAM-2175] [BEAM-1115] Support for new State and Timer API in Spark batch mode


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e5fbed7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e5fbed7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e5fbed7

Branch: refs/heads/master
Commit: 5e5fbed70af5d6ff827266d3db89cd5d8d51f544
Parents: 9cdae6c
Author: JingsongLi 
Authored: Wed May 10 19:49:04 2017 +0800
Committer: Aviem Zur 
Committed: Sat Jun 3 16:49:59 2017 +0300

--
 runners/spark/pom.xml   |   2 -
 .../spark/translation/MultiDoFnFunction.java| 104 +--
 .../spark/translation/SparkProcessContext.java  |  23 +++-
 .../spark/translation/TransformTranslator.java  |  84 ---
 .../streaming/StreamingTransformTranslator.java |   3 +-
 5 files changed, 189 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5e5fbed7/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 697f67a..ddb4aca 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -77,8 +77,6 @@
 org.apache.beam.runners.spark.UsesCheckpointRecovery
   
   
-org.apache.beam.sdk.testing.UsesStatefulParDo,
-org.apache.beam.sdk.testing.UsesTimersInParDo,
 org.apache.beam.sdk.testing.UsesSplittableParDo,
 org.apache.beam.sdk.testing.UsesCommittedMetrics,
 org.apache.beam.sdk.testing.UsesTestStream

http://git-wip-us.apache.org/repos/asf/beam/blob/5e5fbed7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 3274912..23d5b32 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -22,16 +22,24 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
@@ -60,6 +68,7 @@ public class MultiDoFnFunction
   private final List> additionalOutputTags;
   private final Map, KV, 
SideInputBroadcast>> sideInputs;
   private final WindowingStrategy windowingStrategy;
+  private final boolean stateful;
 
   /**
* @param aggAccum   The Spark {@link Accumulator} that backs the Beam 
Aggregators.
@@ -70,6 +79,7 @@ public class MultiDoFnFunction
* @param additionalOutputTags Additional {@link TupleTag output tags}.
* @param sideInputsSide inputs used in this {@link DoFn}.
* @param windowingStrategy Input {@link WindowingStrategy}.
+   * @param stateful  Stateful {@link DoFn}.
*/
   public MultiDoFnFunction(
   Accumulator aggAccum,
@@ -80,7 +90,8 @@ public class MultiDoFnFunction
   TupleTag mainOutputTag,
   List> additionalOutputTags,
   Map, KV, SideInputBroadcast>> 
sideInputs,
-  WindowingStrategy windowingStrategy) {
+  WindowingStrategy windowingStrategy,
+  boolean stateful) {
 this.aggAccum = aggAccum;
 this.metricsAccum = metricsAccum;
 this.stepName = stepName;
@@ -90,6 +101,7 @@ public class MultiDoFnFunction
 this.additionalOutputTags =

[GitHub] beam pull request #3126: [BEAM-2279] Add HDFS support to Spark runner profil...

2017-05-12 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/3126

[BEAM-2279] Add HDFS support to Spark runner profiles in archetypes and 
examples

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
add-hdfs-support-to-spark-profiles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3126


commit 29138f4987c22fd26bd84142bcc98ccd0f52bc63
Author: Aviem Zur 
Date:   2017-05-12T19:18:02Z

[BEAM-2279] Add HDFS support to Spark runner profiles in archetypes and 
examples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #3115: [BEAM-2277] Fix URI_SCHEME_PATTERN in FileSystems

2017-05-12 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/3115

[BEAM-2277] Fix URI_SCHEME_PATTERN in FileSystems

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
fix-uri-scheme-pattern-in-filesystems

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3115


commit ca05ed560888a2f2a86442b89e23fb7f49e1acfd
Author: Aviem Zur 
Date:   2017-05-12T13:02:54Z

[BEAM-2277] Fix URI_SCHEME_PATTERN in FileSystems




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2938: Cherry-pick pull request #2649 into release-2.0.0 b...

2017-05-11 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2938


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2938: Cherry-pick pull request #2649 into release-2.0.0 b...

2017-05-05 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2938

Cherry-pick pull request #2649 into release-2.0.0 branch

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`.
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam cherry-pick-2649

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2938.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2938


commit bd5e347f57648ed28b2ae58b263a8c0825320f04
Author: Aviem Zur 
Date:   2017-05-05T20:13:24Z

Cherry-pick pull request #2649 into release-2.0.0 branch




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[3/4] beam git commit: [BEAM-1672] Make MetricsContainers accumulable.

2017-05-05 Thread aviemzur
[BEAM-1672] Make MetricsContainers accumulable.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/46c2f935
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/46c2f935
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/46c2f935

Branch: refs/heads/master
Commit: 46c2f935a99350e18e5d50f1a996996760ebc2e3
Parents: db0ec99
Author: Aviem Zur 
Authored: Fri May 5 23:13:24 2017 +0300
Committer: Aviem Zur 
Committed: Sat May 6 08:27:49 2017 +0300

--
 .../apache/beam/runners/core/LateDataUtils.java |   2 +-
 .../apache/beam/sdk/metrics/CounterCell.java|  27 +-
 .../org/apache/beam/sdk/metrics/DirtyState.java |   3 +-
 .../beam/sdk/metrics/DistributionCell.java  |  16 +-
 .../org/apache/beam/sdk/metrics/GaugeCell.java  |  20 +-
 .../org/apache/beam/sdk/metrics/MetricCell.java |  14 +-
 .../org/apache/beam/sdk/metrics/Metrics.java|   2 +-
 .../beam/sdk/metrics/MetricsContainer.java  |  29 +-
 .../sdk/metrics/MetricsContainerStepMap.java| 487 +++
 .../org/apache/beam/sdk/metrics/MetricsMap.java |   5 +-
 .../beam/sdk/metrics/CounterCellTest.java   |   6 +-
 .../metrics/MetricsContainerStepMapTest.java| 258 ++
 .../beam/sdk/metrics/MetricsContainerTest.java  |  14 +-
 13 files changed, 846 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index c45387b..f7c0d31 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -71,7 +71,7 @@ public class LateDataUtils {
 .isBefore(timerInternals.currentInputWatermarkTime());
 if (expired) {
   // The element is too late for this window.
-  droppedDueToLateness.inc();
+  droppedDueToLateness.update(1L);
   WindowTracing.debug(
   "GroupAlsoByWindow: Dropping element at {} for key: {}; "
   + "window: {} since it is too far behind 
inputWatermark: {}",

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
index 7ab5ebc..4b8548f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * indirection.
  */
 @Experimental(Kind.METRICS)
-public class CounterCell implements MetricCell {
+public class CounterCell implements MetricCell {
 
   private final DirtyState dirty = new DirtyState();
   private final AtomicLong value = new AtomicLong();
@@ -41,13 +41,26 @@ public class CounterCell implements MetricCell {
*/
   CounterCell() {}
 
-  /** Increment the counter by the given amount. */
-  private void add(long n) {
+  /**
+   * Increment the counter by the given amount.
+   * @param n value to increment by. Can be negative to decrement.
+   */
+  public void update(long n) {
 value.addAndGet(n);
 dirty.afterModification();
   }
 
   @Override
+  public void update(Long n) {
+throw new UnsupportedOperationException("CounterCell.update(Long n) should 
not be used"
++ " as it performs unnecessary boxing/unboxing. Use 
CounterCell.update(long n) instead.");
+  }
+
+  @Override public void update(MetricCell other) {
+update((long) other.getCumulative());
+  }
+
+  @Override
   public DirtyState getDirty() {
 return dirty;
   }
@@ -56,12 +69,4 @@ public class CounterCell implements MetricCell {
   public Long getCumulative() {
 return value.get();
   }
-
-  public void inc() {
-add(1);
-  }
-
-  public void inc(long n) {
-add(n);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
index 6706be8..4e0c15c 100644
--- a/sdks/java/core/src/main/java/or

[1/4] beam git commit: [BEAM-1672] Use Accumulable MetricsContainers in Flink runner.

2017-05-05 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master db0ec9991 -> 019d3002b


[BEAM-1672] Use Accumulable MetricsContainers in Flink runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c2da9ad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c2da9ad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c2da9ad

Branch: refs/heads/master
Commit: 8c2da9ad1b8c195757f97feccdbcabcad735c407
Parents: 009cd6e
Author: Aviem Zur 
Authored: Fri May 5 23:14:01 2017 +0300
Committer: Aviem Zur 
Committed: Sat May 6 08:27:49 2017 +0300

--
 .../beam/runners/flink/FlinkRunnerResult.java   |   8 +-
 .../metrics/DoFnRunnerWithMetricsUpdate.java|  12 +-
 .../flink/metrics/FlinkMetricContainer.java | 273 ++-
 .../flink/metrics/FlinkMetricResults.java   | 146 --
 .../flink/metrics/MetricsAccumulator.java   |  60 
 .../flink/metrics/ReaderInvocationUtil.java |   7 +-
 .../translation/wrappers/SourceInputFormat.java |   8 +-
 .../streaming/io/BoundedSourceWrapper.java  |   8 +-
 .../streaming/io/UnboundedSourceWrapper.java|   9 +-
 9 files changed, 174 insertions(+), 357 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 90dc79b..038895a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
-import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.joda.time.Duration;
 
 /**
@@ -72,6 +75,7 @@ public class FlinkRunnerResult implements PipelineResult {
 
   @Override
   public MetricResults metrics() {
-return new FlinkMetricResults(accumulators);
+return asAttemptedOnlyMetricResults(
+(MetricsContainerStepMap) 
accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index dae91fe..40191d2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -34,6 +34,7 @@ import org.joda.time.Instant;
  */
 public class DoFnRunnerWithMetricsUpdate implements 
DoFnRunner {
 
+  private final String stepName;
   private final FlinkMetricContainer container;
   private final DoFnRunner delegate;
 
@@ -41,14 +42,15 @@ public class DoFnRunnerWithMetricsUpdate 
implements DoFnRunner<
   String stepName,
   DoFnRunner delegate,
   RuntimeContext runtimeContext) {
+this.stepName = stepName;
 this.delegate = delegate;
-container = new FlinkMetricContainer(stepName, runtimeContext);
+container = new FlinkMetricContainer(runtimeContext);
   }
 
   @Override
   public void startBundle() {
 try (Closeable ignored =
- 
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ 
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
   delegate.startBundle();
 } catch (IOException e) {
   throw new RuntimeException(e);
@@ -58,7 +60,7 @@ public class DoFnRunnerWithMetricsUpdate 
implements DoFnRunner<
   @Override
   public void processElement(final WindowedValue elem) {
 try (Closeable ignored =
- 
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ 
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
   delegate.processElement(elem);
 } catch (IOException e) {
   throw new 

[4/4] beam git commit: This closes #2649

2017-05-05 Thread aviemzur
This closes #2649


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/019d3002
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/019d3002
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/019d3002

Branch: refs/heads/master
Commit: 019d3002b0e2a7db9c5c2e84a0a95fad60f16422
Parents: db0ec99 8c2da9a
Author: Aviem Zur 
Authored: Sat May 6 08:45:29 2017 +0300
Committer: Aviem Zur 
Committed: Sat May 6 08:45:29 2017 +0300

--
 .../apache/beam/runners/core/LateDataUtils.java |   2 +-
 .../beam/runners/flink/FlinkRunnerResult.java   |   8 +-
 .../metrics/DoFnRunnerWithMetricsUpdate.java|  12 +-
 .../flink/metrics/FlinkMetricContainer.java | 273 +++
 .../flink/metrics/FlinkMetricResults.java   | 146 --
 .../flink/metrics/MetricsAccumulator.java   |  60 +++
 .../flink/metrics/ReaderInvocationUtil.java |   7 +-
 .../translation/wrappers/SourceInputFormat.java |   8 +-
 .../streaming/io/BoundedSourceWrapper.java  |   8 +-
 .../streaming/io/UnboundedSourceWrapper.java|   9 +-
 .../beam/runners/spark/SparkPipelineResult.java |   8 +-
 .../apache/beam/runners/spark/io/SourceRDD.java |   4 +-
 .../runners/spark/io/SparkUnboundedSource.java  |  19 +-
 .../spark/metrics/MetricsAccumulator.java   |  20 +-
 .../spark/metrics/MetricsAccumulatorParam.java  |  20 +-
 .../runners/spark/metrics/SparkBeamMetric.java  |  11 +-
 .../spark/metrics/SparkBeamMetricSource.java|   2 +-
 .../spark/metrics/SparkMetricResults.java   | 172 ---
 .../spark/metrics/SparkMetricsContainer.java| 174 ---
 .../SparkGroupAlsoByWindowViaWindowSet.java |   4 +-
 .../spark/stateful/StateSpecFunctions.java  |   8 +-
 .../translation/DoFnRunnerWithMetrics.java  |   6 +-
 .../spark/translation/MultiDoFnFunction.java|   6 +-
 .../spark/translation/TransformTranslator.java  |   4 +-
 .../streaming/StreamingTransformTranslator.java |   4 +-
 .../apache/beam/sdk/metrics/CounterCell.java|  27 +-
 .../org/apache/beam/sdk/metrics/DirtyState.java |   3 +-
 .../beam/sdk/metrics/DistributionCell.java  |  16 +-
 .../org/apache/beam/sdk/metrics/GaugeCell.java  |  20 +-
 .../org/apache/beam/sdk/metrics/MetricCell.java |  14 +-
 .../org/apache/beam/sdk/metrics/Metrics.java|   2 +-
 .../beam/sdk/metrics/MetricsContainer.java  |  29 +-
 .../sdk/metrics/MetricsContainerStepMap.java| 487 +++
 .../org/apache/beam/sdk/metrics/MetricsMap.java |   5 +-
 .../beam/sdk/metrics/CounterCellTest.java   |   6 +-
 .../metrics/MetricsContainerStepMapTest.java| 258 ++
 .../beam/sdk/metrics/MetricsContainerTest.java  |  14 +-
 37 files changed, 1086 insertions(+), 790 deletions(-)
--




[2/4] beam git commit: [BEAM-1672] Use Accumulable MetricsContainers in Spark runner.

2017-05-05 Thread aviemzur
[BEAM-1672] Use Accumulable MetricsContainers in Spark runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/009cd6e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/009cd6e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/009cd6e2

Branch: refs/heads/master
Commit: 009cd6e289c77fd3bc997e2bcffaf49d72012c8e
Parents: 46c2f93
Author: Aviem Zur 
Authored: Fri May 5 23:13:51 2017 +0300
Committer: Aviem Zur 
Committed: Sat May 6 08:27:49 2017 +0300

--
 .../beam/runners/spark/SparkPipelineResult.java |   8 +-
 .../apache/beam/runners/spark/io/SourceRDD.java |   4 +-
 .../runners/spark/io/SparkUnboundedSource.java  |  19 +-
 .../spark/metrics/MetricsAccumulator.java   |  20 ++-
 .../spark/metrics/MetricsAccumulatorParam.java  |  20 ++-
 .../runners/spark/metrics/SparkBeamMetric.java  |  11 +-
 .../spark/metrics/SparkBeamMetricSource.java|   2 +-
 .../spark/metrics/SparkMetricResults.java   | 172 --
 .../spark/metrics/SparkMetricsContainer.java| 174 ---
 .../SparkGroupAlsoByWindowViaWindowSet.java |   4 +-
 .../spark/stateful/StateSpecFunctions.java  |   8 +-
 .../translation/DoFnRunnerWithMetrics.java  |   6 +-
 .../spark/translation/MultiDoFnFunction.java|   6 +-
 .../spark/translation/TransformTranslator.java  |   4 +-
 .../streaming/StreamingTransformTranslator.java |   4 +-
 15 files changed, 66 insertions(+), 396 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/009cd6e2/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index 3e94a45..3986e33 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -18,13 +18,15 @@
 
 package org.apache.beam.runners.spark;
 
+import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.beam.runners.spark.metrics.SparkMetricResults;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -41,7 +43,6 @@ public abstract class SparkPipelineResult implements 
PipelineResult {
   protected final Future pipelineExecution;
   protected JavaSparkContext javaSparkContext;
   protected PipelineResult.State state;
-  private final SparkMetricResults metricResults = new SparkMetricResults();
 
   SparkPipelineResult(final Future pipelineExecution, final 
JavaSparkContext javaSparkContext) {
 this.pipelineExecution = pipelineExecution;
@@ -106,7 +107,8 @@ public abstract class SparkPipelineResult implements 
PipelineResult {
 
   @Override
   public MetricResults metrics() {
-return metricResults;
+return asAttemptedOnlyMetricResults(
+MetricsAccumulator.getInstance().value());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/009cd6e2/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index e294359..71a19e7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -26,12 +26,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
-import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.Accumulator;
@@ -65,7 +65,7 @@ public class SourceRDD {
 private final SparkRuntimeC

[6/6] beam git commit: This closes #2729

2017-05-04 Thread aviemzur
This closes #2729


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b73918b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b73918b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b73918b5

Branch: refs/heads/master
Commit: b73918b55ab06e5a47ef9dc33ae3dbaebaed330a
Parents: 48c8ed1 8d91a97
Author: Aviem Zur 
Authored: Thu May 4 21:10:14 2017 +0300
Committer: Aviem Zur 
Committed: Thu May 4 21:10:14 2017 +0300

--
 .../beam/runners/core/SideInputHandler.java |  10 +-
 .../apache/beam/runners/flink/FlinkRunner.java  |   3 +
 .../FlinkStreamingTransformTranslators.java |  26 +
 .../wrappers/streaming/DoFnOperator.java|  27 -
 .../streaming/state/FlinkStateInternals.java|   2 +
 runners/spark/pom.xml   |  47 +++-
 .../runners/spark/SparkRunnerRegistrar.java |   4 +-
 .../beam/runners/spark/TestSparkRunner.java |  47 
 .../apache/beam/runners/spark/CacheTest.java|  12 +-
 .../beam/runners/spark/ForceStreamingTest.java  |  18 +--
 .../apache/beam/runners/spark/PipelineRule.java | 109 ---
 .../runners/spark/ProvidedSparkContextTest.java |  10 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  15 +--
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../beam/runners/spark/StreamingTest.java   |  23 
 .../metrics/sink/SparkMetricsSinkTest.java  |  12 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  10 +-
 .../beam/runners/spark/io/NumShardsTest.java|   6 +-
 .../spark/translation/StorageLevelTest.java |  31 +-
 .../translation/streaming/CreateStreamTest.java |  53 -
 .../ResumeFromCheckpointStreamingTest.java  |  62 +++
 .../streaming/StreamingSourceMetricsTest.java   |  14 +--
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../beam/sdk/metrics/MetricsEnvironment.java|   5 +
 .../apache/beam/sdk/testing/TestPipeline.java   |  61 ++-
 25 files changed, 330 insertions(+), 281 deletions(-)
--




[2/6] beam git commit: [BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner

2017-05-04 Thread aviemzur
[BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e2bb180
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e2bb180
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e2bb180

Branch: refs/heads/master
Commit: 0e2bb1808350cbebf771d0971deb06787732800d
Parents: 7c44935
Author: Aljoscha Krettek 
Authored: Sun Mar 19 07:49:08 2017 +0100
Committer: Aviem Zur 
Committed: Thu May 4 20:48:56 2017 +0300

--
 .../FlinkStreamingTransformTranslators.java | 26 
 1 file changed, 26 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0e2bb180/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index c024493..7339c01 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -966,10 +966,36 @@ class FlinkStreamingTransformTranslators {
 
   } else {
 DataStream result = null;
+
+// Determine DataStreams that we use as input several times. For 
those, we need to uniquify
+// input streams because Flink seems to swallow watermarks when we 
have a union of one and
+// the same stream.
+Map, Integer> duplicates = new HashMap<>();
+for (PValue input : allInputs.values()) {
+  DataStream current = context.getInputDataStream(input);
+  Integer oldValue = duplicates.put(current, 1);
+  if (oldValue != null) {
+duplicates.put(current, oldValue + 1);
+  }
+}
+
 for (PValue input : allInputs.values()) {
   DataStream current = context.getInputDataStream(input);
+
+  final Integer timesRequired = duplicates.get(current);
+  if (timesRequired > 1) {
+current = current.flatMap(new FlatMapFunction() {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void flatMap(T t, Collector collector) throws 
Exception {
+collector.collect(t);
+  }
+});
+  }
   result = (result == null) ? current : result.union(current);
 }
+
 context.setOutputDataStream(context.getOutput(transform), result);
   }
 }



[3/6] beam git commit: [BEAM-1763] Verify PAssert execution in runners which support metrics.

2017-05-04 Thread aviemzur
[BEAM-1763] Verify PAssert execution in runners which support metrics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95ade45e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95ade45e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95ade45e

Branch: refs/heads/master
Commit: 95ade45eced4787eb67a9d4d13dae48ffb176919
Parents: 48c8ed1
Author: Aviem Zur 
Authored: Tue May 2 19:00:29 2017 +0300
Committer: Aviem Zur 
Committed: Thu May 4 20:48:56 2017 +0300

--
 .../apache/beam/runners/flink/FlinkRunner.java  |  3 ++
 .../beam/runners/spark/TestSparkRunner.java | 47 
 .../ResumeFromCheckpointStreamingTest.java  | 12 +++--
 .../beam/sdk/metrics/MetricsEnvironment.java|  5 +++
 .../apache/beam/sdk/testing/TestPipeline.java   | 46 ---
 5 files changed, 57 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 181ffda..a5972ef 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -31,6 +31,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -103,6 +104,8 @@ public class FlinkRunner extends 
PipelineRunner {
   public PipelineResult run(Pipeline pipeline) {
 logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
 
+MetricsEnvironment.setMetricsSupported(true);
+
 LOG.info("Executing pipeline using FlinkRunner.");
 
 FlinkPipelineExecutionEnvironment env = new 
FlinkPipelineExecutionEnvironment(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 10e98b8..1e67813 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -40,15 +40,11 @@ import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -116,8 +112,6 @@ public final class TestSparkRunner extends 
PipelineRunner {
 }
 SparkPipelineResult result = null;
 
-int expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
-
 // clear state of Aggregators, Metrics and Watermarks if exists.
 AggregatorsAccumulator.clear();
 MetricsAccumulator.clear();
@@ -137,47 +131,6 @@ public final class TestSparkRunner extends 
PipelineRunner {
 String.format("Finish state %s is not allowed.", finishState),
 finishState,
 isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE));
-
-// validate assertion succeeded (at least once).
-long successAssertions = 0;
-Iterable> counterResults = 
result.metrics().queryMetrics(
-MetricsFilter.builder()
-.addNameFilter(MetricNameFilter.named(PAssert.class, 
PAssert.SUCCESS_COUNTER))
-.build()).counters();
-for (MetricResult counter : counterResults) {
-  if (counter.attempted().longValue() > 0) {
-successAssertions++;
-  }
-}
-Integer expectedA

[4/6] beam git commit: [BEAM-1726] Fix empty side inputs in Flink Streaming Runner

2017-05-04 Thread aviemzur
[BEAM-1726] Fix empty side inputs in Flink Streaming Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/040d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/040d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/040d

Branch: refs/heads/master
Commit: 040d935c67f5cd48f2ffe2721a07fe6e0a50
Parents: 95ade45
Author: Aljoscha Krettek 
Authored: Sat Mar 18 12:16:06 2017 +0100
Committer: Aviem Zur 
Committed: Thu May 4 20:48:56 2017 +0300

--
 .../beam/runners/core/SideInputHandler.java | 10 
 .../wrappers/streaming/DoFnOperator.java| 27 +++-
 2 files changed, 31 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/040d/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 5c67148..b29f9d0 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -161,11 +162,6 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
   @Override
   public  T get(PCollectionView sideInput, BoundedWindow window) {
 
-if (!isReady(sideInput, window)) {
-  throw new IllegalStateException(
-  "Side input " + sideInput + " is not ready for window " + window);
-}
-
 @SuppressWarnings("unchecked")
 Coder windowCoder =
 (Coder) sideInput
@@ -181,6 +177,10 @@ public class SideInputHandler implements 
ReadyCheckingSideInputReader {
 
 Iterable> elements = state.read();
 
+if (elements == null) {
+  elements = Collections.emptyList();
+}
+
 return sideInput.getViewFn().apply(elements);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/040d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c624036..16bf5d2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -463,7 +463,32 @@ public class DoFnOperator
 
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
-// ignore watermarks from the side-input input
+if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+  // this means we will never see any more side input
+  pushbackDoFnRunner.startBundle();
+
+  BagState> pushedBack =
+  pushbackStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+
+  Iterable> pushedBackContents = pushedBack.read();
+  if (pushedBackContents != null) {
+for (WindowedValue elem : pushedBackContents) {
+
+  // we need to set the correct key in case the operator is
+  // a (keyed) window operator
+  setKeyContextElement1(new StreamRecord<>(elem));
+
+  doFnRunner.processElement(elem);
+}
+  }
+
+  setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+  pushbackDoFnRunner.finishBundle();
+
+  // maybe output a new watermark
+  processWatermark1(new Watermark(currentInputWatermark));
+}
   }
 
   @Override



[5/6] beam git commit: [BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals

2017-05-04 Thread aviemzur
[BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c44935e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c44935e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c44935e

Branch: refs/heads/master
Commit: 7c44935e1c47cce2ecfe842e37c2cf89f48d8583
Parents: 040
Author: Aviem Zur 
Authored: Sat Mar 18 15:21:45 2017 +0200
Committer: Aviem Zur 
Committed: Thu May 4 20:48:56 2017 +0300

--
 .../translation/wrappers/streaming/state/FlinkStateInternals.java  | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/7c44935e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c033be6..cea6e0f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -592,6 +592,8 @@ public class FlinkStateInternals implements 
StateInternals {
 }
 current = combineFn.addInput(current, value);
 state.update(current);
+  } catch (RuntimeException re) {
+throw re;
   } catch (Exception e) {
 throw new RuntimeException("Error adding to state." , e);
   }



[1/6] beam git commit: [BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests

2017-05-04 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 48c8ed176 -> b73918b55


[BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner 
tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d91a97b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d91a97b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d91a97b

Branch: refs/heads/master
Commit: 8d91a97b77fbda74c577d2cdbd507395834e147c
Parents: 0e2bb18
Author: Aviem Zur 
Authored: Wed May 3 21:06:00 2017 +0300
Committer: Aviem Zur 
Committed: Thu May 4 20:48:56 2017 +0300

--
 runners/spark/pom.xml   |  47 +++-
 .../runners/spark/SparkRunnerRegistrar.java |   4 +-
 .../apache/beam/runners/spark/CacheTest.java|  12 +-
 .../beam/runners/spark/ForceStreamingTest.java  |  18 +--
 .../apache/beam/runners/spark/PipelineRule.java | 109 ---
 .../runners/spark/ProvidedSparkContextTest.java |  10 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  15 +--
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../beam/runners/spark/StreamingTest.java   |  23 
 .../metrics/sink/SparkMetricsSinkTest.java  |  12 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  10 +-
 .../beam/runners/spark/io/NumShardsTest.java|   6 +-
 .../spark/translation/StorageLevelTest.java |  31 +-
 .../translation/streaming/CreateStreamTest.java |  53 -
 .../ResumeFromCheckpointStreamingTest.java  |  50 ++---
 .../streaming/StreamingSourceMetricsTest.java   |  14 +--
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  21 +++-
 18 files changed, 217 insertions(+), 222 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 38d250e..f7200d6 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -333,9 +333,6 @@
   org.apache.maven.plugins
   maven-surefire-plugin
   
-
-  org.apache.beam.runners.spark.UsesCheckpointRecovery
-
 1
 false
 
@@ -344,6 +341,50 @@
   
false
 
   
+  
+
+  default-test
+  
+test
+  
+  
+
+  org.apache.beam.runners.spark.UsesCheckpointRecovery,
+  org.apache.beam.runners.spark.StreamingTest
+
+
+  
+[
+"--runner=TestSparkRunner",
+"--streaming=false",
+"--enableSparkMetricSinks=true"
+]
+  
+
+  
+
+
+  streaming-tests
+  test
+  
+test
+  
+  
+
+  org.apache.beam.runners.spark.StreamingTest
+
+
+  
+[
+"--runner=TestSparkRunner",
+"--forceStreaming=true",
+"--enableSparkMetricSinks=true"
+]
+  
+
+  
+
+  
 
 
   org.codehaus.mojo

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index bedfda4..bf926dc 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -54,7 +54,9 @@ public final class SparkRunnerRegistrar {
   public static class Options implements PipelineOptionsRegistrar {
 @Override
 public Iterable> getPipelineOptions() {
-  return ImmutableList.>of(SparkPipelineOptions.class);
+  return ImmutableList.>of(
+  SparkPipelineOptions.class,
+  TestSparkPipelineOptions.class);
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTes

[GitHub] beam pull request #2824: [BEAM-2139] Disable SplittableDoFn ValidatesRunner ...

2017-05-02 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2824

[BEAM-2139] Disable SplittableDoFn ValidatesRunner tests for Streaming 
Flink Runner

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
disable-sdf-test-in-flink-streaming-flink-runner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2824.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2824


commit 44302d2ccfd8bd1ed54f6da7fc43db5e61798380
Author: Aviem Zur 
Date:   2017-05-02T15:01:54Z

[BEAM-2139] Disable SplittableDoFn ValidatesRunner tests for Streaming 
Flink Runner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: [BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics

2017-05-01 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 1197bef19 -> b414f8de9


[BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics

Gauge results are flaky on Jenkins, instead of asserting on value
assert on the gauge's existence instead.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5dac56f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5dac56f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5dac56f7

Branch: refs/heads/master
Commit: 5dac56f793c4851bca78dc6f4b4a70d34a016448
Parents: 1197bef
Author: Aviem Zur 
Authored: Mon May 1 07:53:39 2017 +0300
Committer: Aviem Zur 
Committed: Mon May 1 10:50:23 2017 +0300

--
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 41 +---
 1 file changed, 26 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5dac56f7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
--
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 591c099..ccbd3d6 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
@@ -99,6 +98,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Utils;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+import org.hamcrest.collection.IsIterableWithSize;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -627,7 +627,6 @@ public class KafkaIOTest {
 MetricsFilter.builder().build());
 
 Iterable> counters = metrics.counters();
-Iterable> gauges = metrics.gauges();
 
 assertThat(counters, hasItem(
 MetricMatchers.attemptedMetricsResult(
@@ -657,19 +656,31 @@ public class KafkaIOTest {
 readStep,
 12000L)));
 
-assertThat(gauges, hasItem(
-attemptedMetricsResult(
-backlogElementsOfSplit.namespace(),
-backlogElementsOfSplit.name(),
-readStep,
-GaugeResult.create(0L, Instant.now();
-
-assertThat(gauges, hasItem(
-attemptedMetricsResult(
-backlogBytesOfSplit.namespace(),
-backlogBytesOfSplit.name(),
-readStep,
-GaugeResult.create(0L, Instant.now();
+MetricQueryResults backlogElementsMetrics =
+result.metrics().queryMetrics(
+MetricsFilter.builder()
+.addNameFilter(
+MetricNameFilter.named(
+backlogElementsOfSplit.namespace(),
+backlogElementsOfSplit.name()))
+.build());
+
+// since gauge values may be inconsistent in some environments assert only 
on their existence.
+assertThat(backlogElementsMetrics.gauges(),
+IsIterableWithSize.>iterableWithSize(1));
+
+MetricQueryResults backlogBytesMetrics =
+result.metrics().queryMetrics(
+MetricsFilter.builder()
+.addNameFilter(
+MetricNameFilter.named(
+backlogBytesOfSplit.namespace(),
+backlogBytesOfSplit.name()))
+.build());
+
+// since gauge values may be inconsistent in some environments assert only 
on their existence.
+assertThat(backlogBytesMetrics.gauges(),
+IsIterableWithSize.>iterableWithSize(1));
   }
 
   @Test



[2/2] beam git commit: This closes #2797

2017-05-01 Thread aviemzur
This closes #2797


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b414f8de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b414f8de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b414f8de

Branch: refs/heads/master
Commit: b414f8de90cd89ac76cd9cbe43b2d2d0049faa71
Parents: 1197bef 5dac56f
Author: Aviem Zur 
Authored: Mon May 1 11:25:25 2017 +0300
Committer: Aviem Zur 
Committed: Mon May 1 11:25:25 2017 +0300

--
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 41 +---
 1 file changed, 26 insertions(+), 15 deletions(-)
--




[GitHub] beam pull request #2797: [BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSour...

2017-04-30 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2797

[BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics

Gauge results are flaky on Jenkins, instead of asserting on value
assert on the gauge's existence instead.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam fix-flaky-kafkaio-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2797.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2797


commit f922bd46ef6899c775e36eacfdb11068cc011209
Author: Aviem Zur 
Date:   2017-05-01T04:53:39Z

[BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics

Gauge results are flaky on Jenkins, instead of asserting on value
assert on the gauge's existence instead.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #2344

2017-04-29 Thread aviemzur
This closes #2344


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47821ad6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47821ad6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47821ad6

Branch: refs/heads/master
Commit: 47821ad695f67977c775f62b6f8791ca109a7d0b
Parents: 81474ae 930c27f
Author: Aviem Zur 
Authored: Sat Apr 29 18:16:17 2017 +0300
Committer: Aviem Zur 
Committed: Sat Apr 29 18:16:17 2017 +0300

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  65 +-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 130 +++
 2 files changed, 194 insertions(+), 1 deletion(-)
--




[1/2] beam git commit: [BEAM-1398] KafkaIO metrics.

2017-04-29 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 81474aeaf -> 47821ad69


[BEAM-1398] KafkaIO metrics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/930c27f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/930c27f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/930c27f5

Branch: refs/heads/master
Commit: 930c27f55fc980702089fe58fdb0edded96a2ac6
Parents: 81474ae
Author: Aviem Zur 
Authored: Tue Mar 28 07:29:53 2017 +0300
Committer: Aviem Zur 
Committed: Sat Apr 29 18:08:19 2017 +0300

--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  65 +-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 130 +++
 2 files changed, 194 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 47d8281..211f1a4 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -69,6 +69,10 @@ import 
org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer;
 import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.SinkMetrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -950,6 +954,13 @@ public class KafkaIO {
 private Deserializer keyDeserializerInstance = null;
 private Deserializer valueDeserializerInstance = null;
 
+private final Counter elementsRead = SourceMetrics.elementsRead();
+private final Counter bytesRead = SourceMetrics.bytesRead();
+private final Counter elementsReadBySplit;
+private final Counter bytesReadBySplit;
+private final Gauge backlogBytesOfSplit;
+private final Gauge backlogElementsOfSplit;
+
 private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
 private static final Duration NEW_RECORDS_POLL_TIMEOUT = 
Duration.millis(10);
 
@@ -1023,10 +1034,18 @@ public class KafkaIO {
 
   synchronized long approxBacklogInBytes() {
 // Note that is an an estimate of uncompressed backlog.
+long backlogMessageCount = backlogMessageCount();
+if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
+  return UnboundedReader.BACKLOG_UNKNOWN;
+}
+return (long) (backlogMessageCount * avgRecordSize);
+  }
+
+  synchronized long backlogMessageCount() {
 if (latestOffset < 0 || nextOffset < 0) {
   return UnboundedReader.BACKLOG_UNKNOWN;
 }
-return Math.max(0, (long) ((latestOffset - nextOffset) * 
avgRecordSize));
+return Math.max(0, (latestOffset - nextOffset));
   }
 }
 
@@ -1065,6 +1084,13 @@ public class KafkaIO {
   partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
 }
   }
+
+  String splitId = String.valueOf(source.id);
+
+  elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId);
+  bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
+  backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
+  backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
 }
 
 private void consumerPollLoop() {
@@ -1194,6 +1220,9 @@ public class KafkaIO {
 if (curBatch.hasNext()) {
   PartitionState pState = curBatch.next();
 
+  elementsRead.inc();
+  elementsReadBySplit.inc();
+
   if (!pState.recordIter.hasNext()) { // -- (c)
 pState.recordIter = Collections.emptyIterator(); // drop ref
 curBatch.remove();
@@ -1241,6 +1270,8 @@ public class KafkaIO {
   int recordSize = (rawRecord.key() == null ? 0 : 
rawRecord.key().length)
   + (rawRecord.value() == null ? 0 : rawRecord.value().length);
   pState.recordConsumed(offset, recordSize);
+  bytesRead.inc(recordSize);
+  bytesReadBySplit.inc(recordSize);
   return true;
 
 } else { // -- (b)
@@ -1278,6 +1309,19 @@ public class KafkaIO {
   LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
 }
 
+private void reportBacklog() {
+ 

[2/2] beam git commit: This closes #2730

2017-04-29 Thread aviemzur
This closes #2730


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81474aea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81474aea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81474aea

Branch: refs/heads/master
Commit: 81474aeafeb4dbc16a48b62114dc6f348eb5f426
Parents: bac0633 5911773
Author: Aviem Zur 
Authored: Sat Apr 29 17:58:10 2017 +0300
Committer: Aviem Zur 
Committed: Sat Apr 29 17:58:10 2017 +0300

--
 .../metrics/sink/SparkMetricsSinkTest.java  | 86 
 1 file changed, 86 insertions(+)
--




[1/2] beam git commit: [BEAM-2057] Add a test for metrics reporting in Spark runner.

2017-04-29 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master bac06331e -> 81474aeaf


[BEAM-2057] Add a test for metrics reporting in Spark runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59117737
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59117737
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59117737

Branch: refs/heads/master
Commit: 59117737619ba90345761ae0aefcf361eabf3772
Parents: bac0633
Author: Holden Karau 
Authored: Wed Apr 26 22:22:49 2017 -0700
Committer: Aviem Zur 
Committed: Sat Apr 29 17:57:42 2017 +0300

--
 .../metrics/sink/SparkMetricsSinkTest.java  | 86 
 1 file changed, 86 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/59117737/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
new file mode 100644
index 000..b0ad972
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.spark.PipelineRule;
+import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+
+/**
+ * A test that verifies Beam metrics are reported to Spark's metrics sink.
+ */
+public class SparkMetricsSinkTest {
+
+  @Rule
+  public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
+
+  @Rule
+  public final PipelineRule pipelineRule = PipelineRule.batch();
+
+  private Pipeline createSparkPipeline() {
+pipelineRule.getOptions().setEnableSparkMetricSinks(true);
+return pipelineRule.createPipeline();
+  }
+
+  private void runPipeline() {
+final List words =
+Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
+
+final Set expectedCounts =
+ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+
+final Pipeline pipeline = createSparkPipeline();
+
+final PCollection output =
+pipeline
+.apply(Create.of(words).withCoder(StringUtf8Coder.of()))
+.apply(new WordCount.CountWords())
+.apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+PAssert.that(output).containsInAnyOrder(expectedCounts);
+
+pipeline.run();
+  }
+
+  @Test
+  public void testNamedMetric() throws Exception {
+assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
+
+runPipeline();
+
+assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d));
+  }
+}



[GitHub] beam pull request #2781: Fix broken build

2017-04-29 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2781

Fix broken build

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam fix-broken-build

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2781.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2781


commit 2384e46be0f49ffa9e5c94ec929b344a46a72af6
Author: Aviem Zur 
Date:   2017-04-29T11:08:06Z

Fix broken build




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2746: [BEAM-2029] NullPointerException when using multi o...

2017-04-27 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2746

[BEAM-2029] NullPointerException when using multi output ParDo in Spark 
runner in streaming mode.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
npe-in-multiple-output-pardo-spark-streaming

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2746






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2729: [BEAM-1763] Verify PAssert execution in runners whi...

2017-04-26 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2729

[BEAM-1763] Verify PAssert execution in runners which support metrics.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
testpipeline-passert-verification

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2729


commit 6ca854c9cbe3e8239945dcb1b277d89fadf5dfdd
Author: Aviem Zur 
Date:   2017-04-27T05:19:06Z

[BEAM-1763] Verify PAssert execution in runners which support metrics.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: [BEAM-1958] Standard IO Metrics in Java SDK

2017-04-26 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master f3cff3695 -> 30dbaf891


[BEAM-1958] Standard IO Metrics in Java SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/41d52be0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/41d52be0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/41d52be0

Branch: refs/heads/master
Commit: 41d52be0ec64c83a79d97bfd3c27eb104b546991
Parents: f3cff36
Author: Aviem Zur 
Authored: Thu Apr 13 20:27:33 2017 +0300
Committer: Aviem Zur 
Committed: Wed Apr 26 19:53:29 2017 +0300

--
 .../streaming/StreamingSourceMetricsTest.java   |  12 +-
 .../org/apache/beam/sdk/io/CountingSource.java  |   6 +-
 .../apache/beam/sdk/metrics/CounterCell.java|  19 +--
 .../beam/sdk/metrics/DistributionCell.java  |  10 +-
 .../org/apache/beam/sdk/metrics/GaugeCell.java  |   8 +-
 .../org/apache/beam/sdk/metrics/Metric.java |   8 +-
 .../org/apache/beam/sdk/metrics/MetricCell.java |   8 +-
 .../org/apache/beam/sdk/metrics/Metrics.java|  12 ++
 .../beam/sdk/metrics/MetricsContainer.java  |   8 +-
 .../apache/beam/sdk/metrics/SinkMetrics.java|  49 
 .../apache/beam/sdk/metrics/SourceMetrics.java  | 116 +++
 .../apache/beam/sdk/metrics/MetricsTest.java|  19 ++-
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   4 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |   5 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java|   4 +-
 15 files changed, 228 insertions(+), 60 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
index 80f7f53..5a4b1b5 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
@@ -29,9 +29,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,6 +43,7 @@ import org.junit.Test;
  * Verify metrics support for {@link Source Sources} in streaming pipelines.
  */
 public class StreamingSourceMetricsTest implements Serializable {
+  private static final MetricName ELEMENTS_READ = 
SourceMetrics.elementsRead().getName();
 
   // Force streaming pipeline using pipeline rule.
   @Rule
@@ -65,10 +68,15 @@ public class StreamingSourceMetricsTest implements 
Serializable {
 .metrics()
 .queryMetrics(
 MetricsFilter.builder()
-.addNameFilter(MetricNameFilter.named("io", 
"elementsRead"))
+.addNameFilter(
+MetricNameFilter.named(ELEMENTS_READ.namespace(), 
ELEMENTS_READ.name()))
 .build());
 
 assertThat(metrics.counters(), hasItem(
-attemptedMetricsResult("io", "elementsRead", 
"Read(UnboundedCountingSource)", 1000L)));
+attemptedMetricsResult(
+ELEMENTS_READ.namespace(),
+ELEMENTS_READ.name(),
+"Read(UnboundedCountingSource)",
+1000L)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index b66a8b2..81082e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptions;

[2/2] beam git commit: This closes #2538

2017-04-26 Thread aviemzur
This closes #2538


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30dbaf89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30dbaf89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30dbaf89

Branch: refs/heads/master
Commit: 30dbaf8913eda2b91c8fc43823f7e5feee17528a
Parents: f3cff36 41d52be
Author: Aviem Zur 
Authored: Wed Apr 26 20:26:24 2017 +0300
Committer: Aviem Zur 
Committed: Wed Apr 26 20:26:24 2017 +0300

--
 .../streaming/StreamingSourceMetricsTest.java   |  12 +-
 .../org/apache/beam/sdk/io/CountingSource.java  |   6 +-
 .../apache/beam/sdk/metrics/CounterCell.java|  19 +--
 .../beam/sdk/metrics/DistributionCell.java  |  10 +-
 .../org/apache/beam/sdk/metrics/GaugeCell.java  |   8 +-
 .../org/apache/beam/sdk/metrics/Metric.java |   8 +-
 .../org/apache/beam/sdk/metrics/MetricCell.java |   8 +-
 .../org/apache/beam/sdk/metrics/Metrics.java|  12 ++
 .../beam/sdk/metrics/MetricsContainer.java  |   8 +-
 .../apache/beam/sdk/metrics/SinkMetrics.java|  49 
 .../apache/beam/sdk/metrics/SourceMetrics.java  | 116 +++
 .../apache/beam/sdk/metrics/MetricsTest.java|  19 ++-
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   4 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |   5 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java|   4 +-
 15 files changed, 228 insertions(+), 60 deletions(-)
--




[GitHub] beam pull request #2547: [BEAM-1758] Option to disable metrics reporting.

2017-04-25 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2547


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2522: [BEAM-1672] Extract interface MetricData

2017-04-25 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2522


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2649: [BEAM-1672] Accumulable MetricsContainers.

2017-04-22 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2649

[BEAM-1672] Accumulable MetricsContainers.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam accumulable-metricscontainer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2649


commit 9f81af4b1a71af97672956e75298cff115c63fad
Author: Aviem Zur 
Date:   2017-04-22T14:45:35Z

[BEAM-1672] Make MetricsContainers accumulable.

commit 1a208e2dd7128a25cf62ee75fae4078b2af665f9
Author: Aviem Zur 
Date:   2017-04-22T16:00:11Z

[BEAM-1672] AccumulatedMetricsResults

commit f3f9b588eb473705506dc4bbc2020e3628e2e843
Author: Aviem Zur 
Date:   2017-04-22T16:00:51Z

[BEAM-1672] Use Accumulable MetricsContainers in Spark runner.

commit f41466b23ba475e141fde6d3db8c5c2fc14470c9
Author: Aviem Zur 
Date:   2017-04-22T16:01:38Z

[BEAM-1672] Use Accumulable MetricsContainers in Flink runner.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2547: [BEAM-1758] Option to disable metrics reporting.

2017-04-15 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2547

[BEAM-1758] Option to disable metrics reporting.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam metrics-disable-option

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2547


commit 626c4733a7046e9aaa2e8e4985edf8087993b71a
Author: Aviem Zur 
Date:   2017-04-15T06:59:20Z

[BEAM-1758] Option to disable metrics reporting.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2538: [BEAM-1958] Standard IO Metrics in Java SDK

2017-04-13 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2538

[BEAM-1958] Standard IO Metrics in Java SDK

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam standard-io-metrics-java

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2538


commit e1b4db663931c363551b262f21c3f20ddb9a211a
Author: Aviem Zur 
Date:   2017-04-13T17:27:33Z

[BEAM-1958] Standard IO Metrics in Java SDK




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #2514

2017-04-12 Thread aviemzur
This closes #2514


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21a2b96a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21a2b96a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21a2b96a

Branch: refs/heads/master
Commit: 21a2b96a1f617870aa5926d69878a72533ba259e
Parents: dc672f4 fdabd41
Author: Aviem Zur 
Authored: Thu Apr 13 07:36:34 2017 +0300
Committer: Aviem Zur 
Committed: Thu Apr 13 07:36:34 2017 +0300

--
 .../java/org/apache/beam/runners/spark/io/MicrobatchSource.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] beam git commit: [BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache

2017-04-12 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master dc672f420 -> 21a2b96a1


[BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdabd41a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdabd41a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdabd41a

Branch: refs/heads/master
Commit: fdabd41ae7bf0faaae979a12bb50720ae2ed24ce
Parents: dc672f4
Author: Aviem Zur 
Authored: Wed Apr 12 20:41:11 2017 +0300
Committer: Aviem Zur 
Committed: Thu Apr 13 07:36:20 2017 +0300

--
 .../java/org/apache/beam/runners/spark/io/MicrobatchSource.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/fdabd41a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 002eb34..847de19 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -141,7 +141,7 @@ public class MicrobatchSource

[GitHub] beam pull request #2522: [BEAM-1672] Extract interface MetricData

2017-04-12 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2522

[BEAM-1672] Extract interface MetricData

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam metricdata-interface

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2522.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2522


commit aa41c2cbe28797686a34eb47c26502953c86b462
Author: Aviem Zur 
Date:   2017-04-09T19:41:12Z

[BEAM-1672] Extract interface MetricData




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2514: [BEAM-1950] Add missing 'static' keyword to Microba...

2017-04-12 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2514

[BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
fix-microbatchsource-readercache-lock

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2514.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2514


commit 5751c0df58a747acc9c6138dfeef45fcb5dc9ec2
Author: Aviem Zur 
Date:   2017-04-12T17:41:11Z

[BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2263: [BEAM-1726] TestFlinkRunner should assert PAssert s...

2017-04-12 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2263


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #208: Add JSON as a planned IO to built-in IOs page.

2017-04-11 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam-site/pull/208

Add JSON as a planned IO to built-in IOs page.

R: @iemejia  

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam-site add-json-to-io-page

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #208


commit d4e83a258f5d1ae85e2b1f8c6c98c1ddca70
Author: Zur, Aviem 
Date:   2017-04-12T05:22:09Z

Add JSON as a planned IO to built-in IOs page.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #2033

2017-04-09 Thread aviemzur
This closes #2033


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/efd785f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/efd785f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/efd785f8

Branch: refs/heads/master
Commit: efd785f881d2c231f202c5031d6aeeb042177850
Parents: a0cfccd d958796
Author: Aviem Zur 
Authored: Sun Apr 9 22:47:03 2017 +0300
Committer: Aviem Zur 
Committed: Sun Apr 9 22:47:03 2017 +0300

--
 .../beam/runners/spark/io/MicrobatchSource.java | 113 ---
 .../beam/runners/spark/io/SourceDStream.java|  11 +-
 .../spark/stateful/StateSpecFunctions.java  |   6 +-
 .../ResumeFromCheckpointStreamingTest.java  |  14 ++-
 4 files changed, 118 insertions(+), 26 deletions(-)
--




[1/2] beam git commit: [BEAM-1294] Long running UnboundedSource Readers

2017-04-09 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master a0cfccda4 -> efd785f88


[BEAM-1294] Long running UnboundedSource Readers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d958796b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d958796b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d958796b

Branch: refs/heads/master
Commit: d958796b525861764318f0c022e4987aa64ac300
Parents: a0cfccd
Author: Aviem Zur 
Authored: Fri Feb 17 12:35:49 2017 +0200
Committer: Aviem Zur 
Committed: Sun Apr 9 22:42:57 2017 +0300

--
 .../beam/runners/spark/io/MicrobatchSource.java | 113 ---
 .../beam/runners/spark/io/SourceDStream.java|  11 +-
 .../spark/stateful/StateSpecFunctions.java  |   6 +-
 .../ResumeFromCheckpointStreamingTest.java  |  14 ++-
 4 files changed, 118 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index ff818a1..002eb34 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -19,11 +19,18 @@
 package org.apache.beam.runners.spark.io;
 
 import com.google.api.client.util.BackOff;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -49,29 +56,34 @@ import org.slf4j.LoggerFactory;
 public class MicrobatchSource
 extends BoundedSource {
   private static final Logger LOG = 
LoggerFactory.getLogger(MicrobatchSource.class);
+  private static volatile Cache, BoundedReader> 
readerCache;
 
   private final UnboundedSource source;
   private final Duration maxReadTime;
   private final int numInitialSplits;
   private final long maxNumRecords;
   private final int sourceId;
+  private final double readerCacheInterval;
 
   // each split of the underlying UnboundedSource is associated with a 
(consistent) id
   // to match it's corresponding CheckpointMark state.
   private final int splitId;
 
-  MicrobatchSource(UnboundedSource source,
-   Duration maxReadTime,
-   int numInitialSplits,
-   long maxNumRecords,
-   int splitId,
-   int sourceId) {
+  MicrobatchSource(
+  UnboundedSource source,
+  Duration maxReadTime,
+  int numInitialSplits,
+  long maxNumRecords,
+  int splitId,
+  int sourceId,
+  double readerCacheInterval) {
 this.source = source;
 this.maxReadTime = maxReadTime;
 this.numInitialSplits = numInitialSplits;
 this.maxNumRecords = maxNumRecords;
 this.splitId = splitId;
 this.sourceId = sourceId;
+this.readerCacheInterval = readerCacheInterval;
   }
 
   /**
@@ -101,7 +113,8 @@ public class MicrobatchSource(splits.get(i), maxReadTime, 1, 
numRecords[i], i, sourceId));
+  result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, 
numRecords[i], i, sourceId,
+  readerCacheInterval));
 }
 return result;
   }
@@ -113,12 +126,30 @@ public class MicrobatchSource createReader(PipelineOptions options) throws 
IOException {
-return createReader(options, null);
+return getOrCreateReader(options, null);
   }
 
-  public BoundedReader createReader(PipelineOptions options, 
CheckpointMarkT checkpointMark)
-  throws IOException {
-return new Reader(source.createReader(options, checkpointMark));
+  @SuppressWarnings("unchecked")
+  public BoundedReader getOrCreateReader(
+  PipelineOptions options,
+  CheckpointMarkT checkpointMark) throws IOException {
+try {
+  initReaderCache((long) readerCacheInterval);
+  return (BoundedReader) readerCache.get(this, new 
ReaderLoader(options, checkpointMark));
+} catch (ExecutionException e) {
+  throw new RuntimeException("Failed to get or create reader", e);
+}
+  }
+
+  private synchronized void initReaderCache(long reade

[1/2] beam git commit: [BEAM-1337] Infer state coders

2017-04-01 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 03dce6dcc -> e31ca8b0d


[BEAM-1337] Infer state coders


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/42e690e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/42e690e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/42e690e8

Branch: refs/heads/master
Commit: 42e690e84a9f05d508f2528b1444b26ce031e080
Parents: 03dce6d
Author: Aviem Zur 
Authored: Wed Mar 1 07:27:57 2017 +0200
Committer: Aviem Zur 
Committed: Sat Apr 1 10:27:14 2017 +0300

--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  62 ++
 .../apache/beam/sdk/util/state/StateSpec.java   |  15 +
 .../apache/beam/sdk/util/state/StateSpecs.java  | 264 -
 .../apache/beam/sdk/transforms/ParDoTest.java   | 578 +++
 4 files changed, 902 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/42e690e8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 664fbc3..3de845b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -22,6 +22,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -29,6 +31,7 @@ import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -41,6 +44,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -434,6 +438,59 @@ public class ParDo {
 return DisplayData.item("fn", fn.getClass()).withLabel("Transform 
Function");
   }
 
+  private static void finishSpecifyingStateSpecs(
+  DoFn fn,
+  CoderRegistry coderRegistry,
+  Coder inputCoder) {
+DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+Map stateDeclarations = 
signature.stateDeclarations();
+for (DoFnSignature.StateDeclaration stateDeclaration : 
stateDeclarations.values()) {
+  try {
+StateSpec stateSpec = (StateSpec) 
stateDeclaration.field().get(fn);
+stateSpec.offerCoders(codersForStateSpecTypes(stateDeclaration, 
coderRegistry, inputCoder));
+stateSpec.finishSpecifying();
+  } catch (IllegalAccessException e) {
+throw new RuntimeException(e);
+  }
+}
+  }
+
+  /**
+   * Try to provide coders for as many of the type arguments of given
+   * {@link DoFnSignature.StateDeclaration} as possible.
+   */
+  private static  Coder[] codersForStateSpecTypes(
+  DoFnSignature.StateDeclaration stateDeclaration,
+  CoderRegistry coderRegistry,
+  Coder inputCoder) {
+Type stateType = stateDeclaration.stateType().getType();
+
+if (!(stateType instanceof ParameterizedType)) {
+  // No type arguments means no coders to infer.
+  return new Coder[0];
+}
+
+Type[] typeArguments = ((ParameterizedType) 
stateType).getActualTypeArguments();
+Coder[] coders = new Coder[typeArguments.length];
+
+for (int i = 0; i < typeArguments.length; i++) {
+  Type typeArgument = typeArguments[i];
+  TypeDescriptor typeDescriptor = TypeDescriptor.of(typeArgument);
+  try {
+coders[i] = coderRegistry.getDefaultCoder(typeDescriptor);
+  } catch (CannotProvideCoderException e) {
+try {
+  coders[i] = coderRegistry.getDefaultCoder(
+  typeDescriptor, inputCoder.getEncodedTypeDescriptor(), 
inputCoder);
+} catch (CannotProvideCoderException ignored) {
+  // Since not all type arguments will have a registered coder we 
ignore this exception.
+}
+  }
+}
+
+return coders;
+  }
+
   /**
* Perform common validat

[2/2] beam git commit: This closes #2133

2017-04-01 Thread aviemzur
This closes #2133


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e31ca8b0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e31ca8b0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e31ca8b0

Branch: refs/heads/master
Commit: e31ca8b0d05e47c2588d5db29c92bac49aa410da
Parents: 03dce6d 42e690e
Author: Aviem Zur 
Authored: Sat Apr 1 10:28:25 2017 +0300
Committer: Aviem Zur 
Committed: Sat Apr 1 10:28:25 2017 +0300

--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  62 ++
 .../apache/beam/sdk/util/state/StateSpec.java   |  15 +
 .../apache/beam/sdk/util/state/StateSpecs.java  | 264 -
 .../apache/beam/sdk/transforms/ParDoTest.java   | 578 +++
 4 files changed, 902 insertions(+), 17 deletions(-)
--




[GitHub] beam pull request #2108: [BEAM-463] [BEAM-464] [BEAM-466] BoundedHeapCoder, ...

2017-03-29 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2108


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #2345

2017-03-28 Thread aviemzur
This closes #2345


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d35e1b0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d35e1b0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d35e1b0d

Branch: refs/heads/master
Commit: d35e1b0d9e47e4eec5c48b5e87ccc9f55955cdf4
Parents: 48fee91 cb05a20
Author: Aviem Zur 
Authored: Tue Mar 28 13:05:22 2017 +0300
Committer: Aviem Zur 
Committed: Tue Mar 28 13:05:22 2017 +0300

--
 .../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] beam git commit: Fix typo in test pipeline enforcement message

2017-03-28 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 48fee91f7 -> d35e1b0d9


Fix typo in test pipeline enforcement message


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cb05a20e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cb05a20e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cb05a20e

Branch: refs/heads/master
Commit: cb05a20e55677b01d1abe0dfda696d1a4f4d1a0c
Parents: 48fee91
Author: Kobi Salant 
Authored: Tue Mar 28 10:25:18 2017 +0300
Committer: ksalant 
Committed: Tue Mar 28 11:40:03 2017 +0300

--
 .../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/cb05a20e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 485dd39..6a8335e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -299,7 +299,7 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 checkState(
 enforcement.isPresent(),
 "Is your TestPipeline declaration missing a @Rule annotation? Usage: "
-+ "@Rule public final transient TestPipeline pipeline = 
TestPipeline.Create();");
++ "@Rule public final transient TestPipeline pipeline = 
TestPipeline.create();");
 
 try {
   return super.run();



[GitHub] beam pull request #2344: [BEAM-1398] KafkaIO metrics.

2017-03-27 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2344

[BEAM-1398] KafkaIO metrics.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam kafka-io-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2344


commit 200ef65c5751327aa058421d035f5f68e3db0ec1
Author: Aviem Zur 
Date:   2017-03-28T04:29:53Z

[BEAM-1398] KafkaIO metrics.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #2162

2017-03-27 Thread aviemzur
This closes #2162


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48fee91f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48fee91f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48fee91f

Branch: refs/heads/master
Commit: 48fee91f7d720d03da53476e9a237eabcbfc0460
Parents: 85b820c 65b5f00
Author: Aviem Zur 
Authored: Tue Mar 28 06:51:39 2017 +0300
Committer: Aviem Zur 
Committed: Tue Mar 28 06:51:39 2017 +0300

--
 .../beam/runners/spark/TestSparkRunner.java | 14 +++-
 .../apache/beam/runners/spark/io/SourceRDD.java | 51 +-
 .../runners/spark/io/SparkUnboundedSource.java  | 48 +
 .../spark/metrics/SparkMetricsContainer.java| 11 ++-
 .../spark/stateful/StateSpecFunctions.java  | 35 +++---
 .../spark/translation/TransformTranslator.java  |  3 +-
 .../streaming/StreamingTransformTranslator.java |  4 +-
 .../streaming/StreamingSourceMetricsTest.java   | 71 
 .../org/apache/beam/sdk/io/CountingSource.java  |  8 +++
 .../apache/beam/sdk/metrics/MetricsTest.java| 45 +
 10 files changed, 244 insertions(+), 46 deletions(-)
--




[1/2] beam git commit: [BEAM-1397] Introduce IO metrics

2017-03-27 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 85b820c37 -> 48fee91f7


[BEAM-1397] Introduce IO metrics


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/65b5f001
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/65b5f001
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/65b5f001

Branch: refs/heads/master
Commit: 65b5f001a4e1790206efe3ff2d418018680ea621
Parents: 85b820c
Author: Aviem Zur 
Authored: Tue Mar 28 06:49:59 2017 +0300
Committer: Aviem Zur 
Committed: Tue Mar 28 06:49:59 2017 +0300

--
 .../beam/runners/spark/TestSparkRunner.java | 14 +++-
 .../apache/beam/runners/spark/io/SourceRDD.java | 51 +-
 .../runners/spark/io/SparkUnboundedSource.java  | 48 +
 .../spark/metrics/SparkMetricsContainer.java| 11 ++-
 .../spark/stateful/StateSpecFunctions.java  | 35 +++---
 .../spark/translation/TransformTranslator.java  |  3 +-
 .../streaming/StreamingTransformTranslator.java |  4 +-
 .../streaming/StreamingSourceMetricsTest.java   | 71 
 .../org/apache/beam/sdk/io/CountingSource.java  |  8 +++
 .../apache/beam/sdk/metrics/MetricsTest.java| 45 +
 10 files changed, 244 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/65b5f001/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index e40534f..be9ff2e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -135,7 +135,12 @@ public final class TestSparkRunner extends 
PipelineRunner {
 isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE));
 
 // validate assertion succeeded (at least once).
-int successAssertions = 
result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
+int successAssertions = 0;
+try {
+  successAssertions = 
result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
+} catch (NullPointerException e) {
+  // No assertions registered will cause an NPE here.
+}
 Integer expectedAssertions = 
testSparkPipelineOptions.getExpectedAssertions() != null
 ? testSparkPipelineOptions.getExpectedAssertions() : 
expectedNumberOfAssertions;
 assertThat(
@@ -145,7 +150,12 @@ public final class TestSparkRunner extends 
PipelineRunner {
 successAssertions,
 is(expectedAssertions));
 // validate assertion didn't fail.
-int failedAssertions = 
result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class);
+int failedAssertions = 0;
+try {
+  failedAssertions = 
result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class);
+} catch (NullPointerException e) {
+  // No assertions registered will cause an NPE here.
+}
 assertThat(
 String.format("Found %d failed assertions.", failedAssertions),
 failedAssertions,

http://git-wip-us.apache.org/repos/asf/beam/blob/65b5f001/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 1a3537f..2f9a827 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -20,15 +20,21 @@ package org.apache.beam.runners.spark.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.Accumulator;
 import org.apache.spark.Dependency;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.InterruptibleItera

[1/2] beam git commit: [BEAM-1792] Use MetricFiltering in Spark runner.

2017-03-27 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master fe441e34b -> 85b820c37


[BEAM-1792] Use MetricFiltering in Spark runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/241ded90
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/241ded90
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/241ded90

Branch: refs/heads/master
Commit: 241ded9022a9214c1d0768b1cb3c7a740a409873
Parents: fe441e3
Author: Pablo 
Authored: Fri Mar 24 10:48:43 2017 -0700
Committer: Aviem Zur 
Committed: Tue Mar 28 05:51:14 2017 +0300

--
 .../spark/metrics/SparkMetricResults.java   | 40 +---
 1 file changed, 2 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/241ded90/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
index c02027a..faf4c52 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
@@ -19,17 +19,15 @@
 package org.apache.beam.runners.spark.metrics;
 
 import com.google.common.base.Function;
-import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
-import java.util.Set;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeData;
 import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricFiltering;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
@@ -88,44 +86,10 @@ public class SparkMetricResults extends MetricResults {
   return new Predicate>() {
 @Override
 public boolean apply(MetricUpdate metricResult) {
-  return matches(filter, metricResult.getKey());
+  return MetricFiltering.matches(filter, metricResult.getKey());
 }
   };
 }
-
-private boolean matches(MetricsFilter filter, MetricKey key) {
-  return matchesName(key.metricName(), filter.names())
-  && matchesScope(key.stepName(), filter.steps());
-}
-
-private boolean matchesName(MetricName metricName, Set 
nameFilters) {
-  if (nameFilters.isEmpty()) {
-return true;
-  }
-
-  for (MetricNameFilter nameFilter : nameFilters) {
-if ((nameFilter.getName() == null || 
nameFilter.getName().equals(metricName.name()))
-&& Objects.equal(metricName.namespace(), 
nameFilter.getNamespace())) {
-  return true;
-}
-  }
-
-  return false;
-}
-
-private boolean matchesScope(String actualScope, Set scopes) {
-  if (scopes.isEmpty() || scopes.contains(actualScope)) {
-return true;
-  }
-
-  for (String scope : scopes) {
-if (actualScope.startsWith(scope)) {
-  return true;
-}
-  }
-
-  return false;
-}
   }
 
   private static final Function, 
MetricResult>



[2/2] beam git commit: This closes #2304

2017-03-27 Thread aviemzur
This closes #2304


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85b820c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85b820c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85b820c3

Branch: refs/heads/master
Commit: 85b820c3799ff292873acfd5d456c1f3c4321ae9
Parents: fe441e3 241ded9
Author: Aviem Zur 
Authored: Tue Mar 28 05:55:49 2017 +0300
Committer: Aviem Zur 
Committed: Tue Mar 28 05:55:49 2017 +0300

--
 .../spark/metrics/SparkMetricResults.java   | 40 +---
 1 file changed, 2 insertions(+), 38 deletions(-)
--




[2/2] beam git commit: This closes #2151

2017-03-27 Thread aviemzur
This closes #2151


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b26e10b4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b26e10b4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b26e10b4

Branch: refs/heads/master
Commit: b26e10b44a0b82359dea7b96e0d49dd595fae785
Parents: 026aec8 63e953c
Author: Aviem Zur 
Authored: Mon Mar 27 19:37:52 2017 +0300
Committer: Aviem Zur 
Committed: Mon Mar 27 19:37:52 2017 +0300

--
 .../beam/runners/direct/DirectMetrics.java  | 59 +-
 .../beam/runners/direct/DirectMetricsTest.java  | 42 --
 .../beam/runners/dataflow/DataflowMetrics.java  | 16 +++-
 .../runners/spark/metrics/SparkBeamMetric.java  |  4 +
 .../spark/metrics/SparkMetricResults.java   | 27 +++
 .../spark/metrics/SparkMetricsContainer.java| 20 +
 .../java/org/apache/beam/sdk/metrics/Gauge.java | 32 
 .../org/apache/beam/sdk/metrics/GaugeCell.java  | 60 +++
 .../org/apache/beam/sdk/metrics/GaugeData.java  | 81 
 .../apache/beam/sdk/metrics/GaugeResult.java| 61 +++
 .../beam/sdk/metrics/MetricQueryResults.java|  3 +
 .../apache/beam/sdk/metrics/MetricUpdates.java  | 11 ++-
 .../org/apache/beam/sdk/metrics/Metrics.java| 35 +
 .../beam/sdk/metrics/MetricsContainer.java  | 26 ++-
 .../apache/beam/sdk/metrics/GaugeCellTest.java  | 48 
 .../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++-
 .../apache/beam/sdk/metrics/MetricsTest.java| 37 +
 17 files changed, 539 insertions(+), 35 deletions(-)
--




[1/2] beam git commit: [BEAM-1617] Add Gauge metric type to Java SDK

2017-03-27 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 026aec856 -> b26e10b44


[BEAM-1617] Add Gauge metric type to Java SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63e953c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63e953c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63e953c6

Branch: refs/heads/master
Commit: 63e953c6026192e5e027f0bac183b86992480127
Parents: 026aec8
Author: Aviem Zur 
Authored: Fri Mar 3 14:42:23 2017 +0200
Committer: Aviem Zur 
Committed: Mon Mar 27 19:01:58 2017 +0300

--
 .../beam/runners/direct/DirectMetrics.java  | 59 +-
 .../beam/runners/direct/DirectMetricsTest.java  | 42 --
 .../beam/runners/dataflow/DataflowMetrics.java  | 16 +++-
 .../runners/spark/metrics/SparkBeamMetric.java  |  4 +
 .../spark/metrics/SparkMetricResults.java   | 27 +++
 .../spark/metrics/SparkMetricsContainer.java| 20 +
 .../java/org/apache/beam/sdk/metrics/Gauge.java | 32 
 .../org/apache/beam/sdk/metrics/GaugeCell.java  | 60 +++
 .../org/apache/beam/sdk/metrics/GaugeData.java  | 81 
 .../apache/beam/sdk/metrics/GaugeResult.java| 61 +++
 .../beam/sdk/metrics/MetricQueryResults.java|  3 +
 .../apache/beam/sdk/metrics/MetricUpdates.java  | 11 ++-
 .../org/apache/beam/sdk/metrics/Metrics.java| 35 +
 .../beam/sdk/metrics/MetricsContainer.java  | 26 ++-
 .../apache/beam/sdk/metrics/GaugeCellTest.java  | 48 
 .../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++-
 .../apache/beam/sdk/metrics/MetricsTest.java| 37 +
 17 files changed, 539 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index f04dc21..fb126fb 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -33,6 +33,8 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricFiltering;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
@@ -193,6 +195,28 @@ class DirectMetrics extends MetricResults {
 }
   };
 
+  private static final MetricAggregation GAUGE =
+  new MetricAggregation() {
+@Override
+public GaugeData zero() {
+  return GaugeData.empty();
+}
+
+@Override
+public GaugeData combine(Iterable updates) {
+  GaugeData result = GaugeData.empty();
+  for (GaugeData update : updates) {
+result = result.combine(update);
+  }
+  return result;
+}
+
+@Override
+public GaugeResult extract(GaugeData data) {
+  return data.extractResult();
+}
+  };
+
   /** The current values of counters in memory. */
   private MetricsMap> counters =
   new MetricsMap<>(new MetricsMap.Factory>() {
@@ -210,13 +234,23 @@ class DirectMetrics extends MetricResults {
   return new DirectMetric<>(DISTRIBUTION);
 }
   });
+  private MetricsMap> gauges =
+  new MetricsMap<>(
+  new MetricsMap.Factory>() {
+@Override
+public DirectMetric createInstance(
+MetricKey unusedKey) {
+  return new DirectMetric<>(GAUGE);
+}
+  });
 
   @AutoValue
   abstract static class DirectMetricQueryResults implements MetricQueryResults 
{
 public static MetricQueryResults create(
 Iterable> counters,
-Iterable> distributions) {
-  return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, 
distributions);
+Iterable> distributions,
+Iterable> gauges) {
+  return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, 
distributions, gauges);
 }
   }
 
@@ -248,8 +282,15 @@ class DirectMetrics extends MetricResults {
 : distributions.entries()) {
   maybeExtractResult(filter, distributionResults, distribution);
 }
+ImmutableList.Builder> gaugeResults =
+ImmutableList.builder();
+for (Entry> gauge
+: gauges.ent

[2/2] beam git commit: This closes #2328

2017-03-26 Thread aviemzur
This closes #2328


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9e55a43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9e55a43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9e55a43

Branch: refs/heads/master
Commit: c9e55a4360a9fe06d6ed943a222bce524a6b10af
Parents: 348d335 b32f048
Author: Aviem Zur 
Authored: Sun Mar 26 11:23:45 2017 +0300
Committer: Aviem Zur 
Committed: Sun Mar 26 11:23:45 2017 +0300

--
 .../translation/GroupCombineFunctions.java  | 15 ++-
 .../spark/translation/TransformTranslator.java  | 26 
 2 files changed, 25 insertions(+), 16 deletions(-)
--




[1/2] beam git commit: [BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs

2017-03-26 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 348d33588 -> c9e55a436


[BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32f0482
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32f0482
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32f0482

Branch: refs/heads/master
Commit: b32f0482784b9df7ce67226b32febe6e664a45b6
Parents: 348d335
Author: Aviem Zur 
Authored: Sat Mar 25 21:49:06 2017 +0300
Committer: Aviem Zur 
Committed: Sun Mar 26 10:31:40 2017 +0300

--
 .../translation/GroupCombineFunctions.java  | 15 ++-
 .../spark/translation/TransformTranslator.java  | 26 
 2 files changed, 25 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index b2a589d..917a9ee 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -18,8 +18,7 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
+import com.google.common.base.Optional;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.Coder;
@@ -67,14 +66,12 @@ public class GroupCombineFunctions {
   /**
* Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} 
transformation.
*/
-  public static  Iterable> 
combineGlobally(
+  public static  Optional>> 
combineGlobally(
   JavaRDD> rdd,
   final SparkGlobalCombineFn sparkCombineFn,
   final Coder iCoder,
   final Coder aCoder,
   final WindowingStrategy windowingStrategy) {
-checkArgument(!rdd.isEmpty(), "CombineGlobally computation should be 
skipped for empty RDDs.");
-
 // coders.
 final WindowedValue.FullWindowedValueCoder wviCoder =
 WindowedValue.FullWindowedValueCoder.of(iCoder,
@@ -93,6 +90,11 @@ public class GroupCombineFunctions {
 // AccumT: A
 // InputT: I
 JavaRDD inputRDDBytes = 
rdd.map(CoderHelpers.toByteFunction(wviCoder));
+
+if (inputRDDBytes.isEmpty()) {
+  return Optional.absent();
+}
+
 /*Itr>*/ byte[] accumulatedBytes = inputRDDBytes.aggregate(
 CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder),
 new Function2() {
@@ -115,7 +117,8 @@ public class GroupCombineFunctions {
   }
 }
 );
-return CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder);
+
+return Optional.of(CoderHelpers.fromByteArray(accumulatedBytes, 
iterAccumCoder));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b4362b0..ffb207a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -27,6 +27,7 @@ import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh
 import static 
org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable;
 import static 
org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -259,9 +260,20 @@ public final class TransformTranslator {
 ((BoundedDataset) 
context.borrowDataset(transform)).getRDD();
 
 JavaRDD> outRdd;
-// handle empty input RDD, which will naturally skip the entire 
execution
-// as Spark will not run on empty RDDs.
-if (inRdd.isEmpty()) {
+
+Optional>> maybeAccumulated =
+GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, 
iCoder, aCoder,
+windowingStrategy);
+

[GitHub] beam pull request #2328: [BEAM-1810] Avoid RDD#isEmpty in Spark runner Trans...

2017-03-25 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2328

[BEAM-1810] Avoid RDD#isEmpty in Spark runner 
TransformTranslator#combineGlobally

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
combine-globally-kryo-serialization-exception

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2328.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2328


commit 7bc632fbcab1d5f53f5ce52e739769ead8cc0d19
Author: Aviem Zur 
Date:   2017-03-25T18:49:06Z

[BEAM-1810] Avoid RDD#isEmpty in Spark runner 
TransformTranslator#combineGlobally




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #2317

2017-03-25 Thread aviemzur
This closes #2317


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe236993
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe236993
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe236993

Branch: refs/heads/master
Commit: fe2369933a9f1116f22916b34fa13425efdc1a52
Parents: e5f1a64 b436263
Author: Aviem Zur 
Authored: Sat Mar 25 12:17:18 2017 +0300
Committer: Aviem Zur 
Committed: Sat Mar 25 12:17:18 2017 +0300

--
 .../beam/sdk/metrics/MetricNameFilter.java  |  3 +-
 .../beam/sdk/metrics/MetricFilteringTest.java   | 73 
 2 files changed, 74 insertions(+), 2 deletions(-)
--




[1/2] beam git commit: [BEAM-1803] Fixed bug in metrics filtering.

2017-03-25 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master e5f1a6479 -> fe2369933


[BEAM-1803] Fixed bug in metrics filtering.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b436263e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b436263e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b436263e

Branch: refs/heads/master
Commit: b436263e01c32f9e8b648b8422c094ad7c56d2a6
Parents: e5f1a64
Author: Pablo 
Authored: Fri Mar 24 12:53:10 2017 -0700
Committer: Aviem Zur 
Committed: Sat Mar 25 12:15:55 2017 +0300

--
 .../beam/sdk/metrics/MetricNameFilter.java  |  3 +-
 .../beam/sdk/metrics/MetricFilteringTest.java   | 73 
 2 files changed, 74 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b436263e/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
index a2c3798..489b703 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
@@ -54,7 +54,6 @@ public abstract class MetricNameFilter {
   public static MetricNameFilter named(Class namespace, String name) {
 checkNotNull(namespace, "Must specify a inNamespace");
 checkNotNull(name, "Must specify a name");
-return new AutoValue_MetricNameFilter(namespace.getSimpleName(), name);
+return new AutoValue_MetricNameFilter(namespace.getName(), name);
   }
 }
-

http://git-wip-us.apache.org/repos/asf/beam/blob/b436263e/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
index 3e6a499..dc2fa0a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
@@ -40,6 +40,79 @@ public class MetricFilteringTest {
   }
 
   @Test
+  public void testMatchCompositeStepNameFilters() {
+// MetricsFilter with a Class-namespace + name filter + step filter.
+// Successful match.
+assertTrue(MetricFiltering.matches(
+MetricsFilter.builder().addNameFilter(
+MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+.addStep("myStep").build(),
+MetricKey.create(
+"myBigStep/myStep", MetricName.named(MetricFilteringTest.class, 
"myMetricName";
+
+// Unsuccessful match.
+assertFalse(MetricFiltering.matches(
+MetricsFilter.builder().addNameFilter(
+MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+.addStep("myOtherStep").build(),
+MetricKey.create(
+"myOtherStepNoMatch/myStep",
+MetricName.named(MetricFilteringTest.class, "myMetricName";
+  }
+
+  @Test
+  public void testMatchStepNameFilters() {
+// MetricsFilter with a Class-namespace + name filter + step filter.
+// Successful match.
+assertTrue(MetricFiltering.matches(
+MetricsFilter.builder().addNameFilter(
+MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+.addStep("myStep").build(),
+MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, 
"myMetricName";
+
+// Unsuccessful match.
+assertFalse(MetricFiltering.matches(
+MetricsFilter.builder().addNameFilter(
+MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+.addStep("myOtherStep").build(),
+MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, 
"myMetricName";
+  }
+
+  @Test
+  public void testMatchClassNamespaceFilters() {
+// MetricsFilter with a Class-namespace + name filter. Without step filter.
+// Successful match.
+assertTrue(MetricFiltering.matches(
+MetricsFilter.builder().addNameFilter(
+MetricNameFilter.named(MetricFilteringTest.class, 
"myMetricName")).build(),
+MetricKey.create("anyStep", 
MetricName.named(MetricFilteringTest.class, "myMetricName";
+
+// Unsuccessful match.
+assertFalse(MetricFiltering.matches(
+MetricsFilter.builder().addNameFilter(
+MetricNameFilter.named(MetricFilteringTest.class, 
"myMetricName")).build(),
+MetricKey.create("anyStep", MetricName.named(MetricFiltering

[GitHub] beam pull request #2327: [BEAM-1802] Stop Spark context on terminal pipeline...

2017-03-25 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2327

[BEAM-1802] Stop Spark context on terminal pipeline state

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam stop-terminal-spark-pipelines

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2327.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2327


commit 70ba7a5a81046100d3b49a8971febcd089cc3573
Author: Aviem Zur 
Date:   2017-03-25T08:10:35Z

[BEAM-1802] Stop Spark context on terminal pipeline state




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2313: [BEAM-1802] Call stop in SparkPipelineResult#waitUn...

2017-03-24 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2313


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2313: [BEAM-1802] Call stop in SparkPipelineResult#waitUn...

2017-03-24 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2313

[BEAM-1802] Call stop in SparkPipelineResult#waitUntilFinish

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam finally-stop-spark-pipelines

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2313.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2313


commit a3384206a04f327f8ca2098ac0ab078486633096
Author: Aviem Zur 
Date:   2017-03-24T12:12:52Z

[BEAM-1802] Call stop in SparkPipelineResult#waitUntilFinish




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: [BEAM-1074] Set default-partitioner in SourceRDD.Unbounded

2017-03-23 Thread aviemzur
Repository: beam
Updated Branches:
  refs/heads/master 5e1be9fa7 -> 9ac1ffcea


[BEAM-1074] Set default-partitioner in SourceRDD.Unbounded


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/623a5696
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/623a5696
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/623a5696

Branch: refs/heads/master
Commit: 623a5696bc328a9a55bf5de67ad0070a985c96ee
Parents: 5e1be9f
Author: Aviem Zur 
Authored: Wed Mar 22 15:20:51 2017 +0200
Committer: Aviem Zur 
Committed: Thu Mar 23 16:18:16 2017 +0200

--
 .../spark/SparkNativePipelineVisitor.java   |  1 -
 .../beam/runners/spark/io/SourceDStream.java| 52 +++-
 .../apache/beam/runners/spark/io/SourceRDD.java | 19 +--
 .../runners/spark/io/SparkUnboundedSource.java  | 15 +++---
 4 files changed, 63 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/623a5696/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index c2784a2..c2d38d7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -92,7 +92,6 @@ public class SparkNativePipelineVisitor extends 
SparkRunner.Evaluator {
   @Override
   > void
   doVisitTransform(TransformHierarchy.Node node) {
-super.doVisitTransform(node);
 @SuppressWarnings("unchecked")
 TransformT transform = (TransformT) node.getTransform();
 @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/623a5696/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 8a0763b..3f2c10a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.JavaSparkContext$;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.streaming.StreamingContext;
 import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.dstream.InputDStream;
 import org.apache.spark.streaming.scheduler.RateController;
 import org.apache.spark.streaming.scheduler.RateController$;
@@ -36,7 +37,6 @@ import 
org.apache.spark.streaming.scheduler.rate.RateEstimator$;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Tuple2;
 
 
@@ -60,6 +60,9 @@ class SourceDStream
   private final UnboundedSource unboundedSource;
   private final SparkRuntimeContext runtimeContext;
   private final Duration boundReadDuration;
+  // Number of partitions for the DStream is final and remains the same 
throughout the entire
+  // lifetime of the pipeline, including when resuming from checkpoint.
+  private final int numPartitions;
   // the initial parallelism, set by Spark's backend, will be determined once 
when the job starts.
   // in case of resuming/recovering from checkpoint, the DStream will be 
reconstructed and this
   // property should not be reset.
@@ -67,40 +70,55 @@ class SourceDStream
   // the bound on max records is optional.
   // in case it is set explicitly via PipelineOptions, it takes precedence
   // otherwise it could be activated via RateController.
-  private Long boundMaxRecords = null;
+  private final long boundMaxRecords;
 
   SourceDStream(
   StreamingContext ssc,
   UnboundedSource unboundedSource,
-  SparkRuntimeContext runtimeContext) {
-
+  SparkRuntimeContext runtimeContext,
+  Long boundMaxRecords) {
 super(ssc, JavaSparkContext$.MODULE$., 
CheckpointMarkT>>fakeClassTag());
 this.unboundedSource = unboundedSource;
 this.runtimeContext = runtimeContext;
+
 SparkPipelineOptions options = runtimeContext.getPipelineOptions().as(
 SparkPipelineOptions.class);
+
 this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
 options.getMinReadTimeMillis());
 // set initial parallelism once.
 this.initialParallelism = ssc().sc().defaultParallelism();
 checkArgument(this.initialParallelism > 0, "Number of partitions must be 
grea

[2/2] beam git commit: This closes #2288

2017-03-23 Thread aviemzur
This closes #2288


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ac1ffce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ac1ffce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ac1ffce

Branch: refs/heads/master
Commit: 9ac1ffceadb30956655e27d253fd84fe355d5f54
Parents: 5e1be9f 623a569
Author: Aviem Zur 
Authored: Thu Mar 23 16:48:04 2017 +0200
Committer: Aviem Zur 
Committed: Thu Mar 23 16:48:04 2017 +0200

--
 .../spark/SparkNativePipelineVisitor.java   |  1 -
 .../beam/runners/spark/io/SourceDStream.java| 52 +++-
 .../apache/beam/runners/spark/io/SourceRDD.java | 19 +--
 .../runners/spark/io/SparkUnboundedSource.java  | 15 +++---
 4 files changed, 63 insertions(+), 24 deletions(-)
--




[GitHub] beam pull request #2288: [BEAM-848] A better shuffle after reading from with...

2017-03-22 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2288

[BEAM-848] A better shuffle after reading from within mapWithState.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
sourcerdd-unbounded-default-partitioner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2288.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2288


commit 0fa4d29784ef5940c2467529e5ae7fdb78c7b98b
Author: Aviem Zur 
Date:   2017-03-22T13:20:51Z

[BEAM-1074] Set default-partitioner in SourceRDD.Unbounded

commit 581233f5022cabb9ef497611b643dd5413d52060
Author: Aviem Zur 
Date:   2017-03-22T19:11:47Z

[BEAM-1075] Shuffle the input read-values to get maximum parallelism




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #190: Simplify merge process section

2017-03-18 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam-site/pull/190

Simplify merge process section

R: @davorbonaci 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam-site simplify-merge-process

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #190


commit cd5a9b4c832cb79c2536f2cd538b5b12f81cc1b6
Author: Zur, Aviem 
Date:   2017-03-19T04:16:49Z

Simplify merge process section




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[3/4] beam-site git commit: Regenerate website

2017-03-18 Thread aviemzur
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/c6d92268
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/c6d92268
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/c6d92268

Branch: refs/heads/asf-site
Commit: c6d922685e79463fa7f41a8f9dcd86aee59923c0
Parents: b470eb5
Author: Zur, Aviem 
Authored: Sun Mar 19 05:55:02 2017 +0200
Committer: Zur, Aviem 
Committed: Sun Mar 19 05:55:02 2017 +0200

--
 content/contribute/contribution-guide/index.html | 17 +
 1 file changed, 13 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/c6d92268/content/contribute/contribution-guide/index.html
--
diff --git a/content/contribute/contribution-guide/index.html 
b/content/contribute/contribution-guide/index.html
index 4d4a782..af492a9 100644
--- a/content/contribute/contribution-guide/index.html
+++ b/content/contribute/contribution-guide/index.html
@@ -214,7 +214,7 @@
   Website
   One-time Setup
   Working on your change
-  Committing website changes
+  Committing website 
changes (committers only)
 
   
 
@@ -736,7 +736,7 @@ $ git checkout -b  origin/asf-site
 
 While you are working on your pull request, you can test and develop live 
by running the following command in the root folder of the website:
 
-$ bundle exec 
jekyll serve
+$ bundle exec 
jekyll serve --incremental
 
 
 
@@ -758,11 +758,20 @@ $ git checkout -b  origin/asf-site
 
 During review, committers will patch in your PR, generate the static content/, and review the changes.
 
-Committing website changes
+Committing website changes 
(committers only)
 
 Follow the same committer process as above, but using repository apache/beam-site and branch asf-site.
 
-In addition, the committer is responsible for doing the final jekyll build to generate the static content, 
so follow the instructions above to install jekyll.
+In addition, the committer is responsible for doing the final bundle exec jekyll build to generate the 
static content, so follow the instructions above to install jekyll.
+
+This command generates the content/ 
directory. The committer should add and commit the content related to the 
PR.
+
+$ git add 
content/
+$ git commit -m "Regenerate website"
+
+
+
+Finally you should merge the changes into the asf-site branch and push them into the apache repository.
 
   
 



[1/4] beam-site git commit: Improve committing website changes section in the contribution guide

2017-03-18 Thread aviemzur
Repository: beam-site
Updated Branches:
  refs/heads/asf-site f9b4c6e7a -> 41018ed60


Improve committing website changes section in the contribution guide


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/653218f8
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/653218f8
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/653218f8

Branch: refs/heads/asf-site
Commit: 653218f89c0a6b50283bf502538f8b131aa54ea1
Parents: f9b4c6e
Author: Ismaël Mejía 
Authored: Sun Mar 19 01:06:26 2017 +0100
Committer: Ismaël Mejía 
Committed: Sun Mar 19 01:08:50 2017 +0100

--
 src/contribute/contribution-guide.md | 13 ++---
 1 file changed, 10 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/653218f8/src/contribute/contribution-guide.md
--
diff --git a/src/contribute/contribution-guide.md 
b/src/contribute/contribution-guide.md
index b2af352..c1617af 100644
--- a/src/contribute/contribution-guide.md
+++ b/src/contribute/contribution-guide.md
@@ -442,7 +442,7 @@ The general guidelines for cloning a repository can be 
adjusted to use the `asf-
 
 While you are working on your pull request, you can test and develop live by 
running the following command in the root folder of the website:
 
-   $ bundle exec jekyll serve
+   $ bundle exec jekyll serve --incremental
 
 Jekyll will start a webserver on port 4000. As you make changes to the 
content, Jekyll will rebuild it automatically.
 
@@ -458,8 +458,15 @@ When you are ready, submit a pull request using the [Beam 
Site GitHub mirror](ht
 
 During review, committers will patch in your PR, generate the static 
`content/`, and review the changes.
 
- Committing website changes
+ Committing website changes (committers only)
 
 Follow the same committer process as above, but using repository 
`apache/beam-site` and branch `asf-site`.
 
-In addition, the committer is responsible for doing the final `jekyll build` 
to generate the static content, so follow the instructions above to install 
`jekyll`.
+In addition, the committer is responsible for doing the final `bundle exec 
jekyll build` to generate the static content, so follow the instructions above 
to install `jekyll`.
+
+This command generates the `content/` directory. The committer should add and 
commit the content related to the PR.
+
+   $ git add content/
+   $ git commit -m "Regenerate website"
+
+Finally you should merge the changes into the `asf-site` branch and push them 
into the `apache` repository.



[2/4] beam-site git commit: Fix timezone to America/Los_Angeles

2017-03-18 Thread aviemzur
Fix timezone to America/Los_Angeles


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/b470eb5b
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/b470eb5b
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/b470eb5b

Branch: refs/heads/asf-site
Commit: b470eb5b418949f19e862a5faeac06e0a4dc8bfb
Parents: 653218f
Author: Ismaël Mejía 
Authored: Sun Mar 19 01:09:00 2017 +0100
Committer: Ismaël Mejía 
Committed: Sun Mar 19 01:12:03 2017 +0100

--
 _config.yml | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/b470eb5b/_config.yml
--
diff --git a/_config.yml b/_config.yml
index 7d36162..2e9084c 100644
--- a/_config.yml
+++ b/_config.yml
@@ -52,3 +52,6 @@ kramdown:
 release_latest: 0.6.0
 
 # Plugins are configured in the Gemfile.
+
+# Set the time zone for site generation, fixed to US Pacific Time
+timezone: America/Los_Angeles



[4/4] beam-site git commit: This closes #189

2017-03-18 Thread aviemzur
This closes #189


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/41018ed6
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/41018ed6
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/41018ed6

Branch: refs/heads/asf-site
Commit: 41018ed6016c18fc0dac1a2740e510a9a2460bca
Parents: f9b4c6e c6d9226
Author: Zur, Aviem 
Authored: Sun Mar 19 05:58:40 2017 +0200
Committer: Zur, Aviem 
Committed: Sun Mar 19 05:58:40 2017 +0200

--
 _config.yml  |  3 +++
 content/contribute/contribution-guide/index.html | 17 +
 src/contribute/contribution-guide.md | 13 ++---
 3 files changed, 26 insertions(+), 7 deletions(-)
--




[GitHub] beam-site pull request #187: Add aviemzur as committer

2017-03-17 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam-site/pull/187

Add aviemzur as committer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam-site aviemzur-committer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/187.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #187


commit 41b8245b94d5b02512c94b53a49cb7910edfbc1f
Author: Zur, Aviem 
Date:   2017-03-18T04:09:19Z

Add aviemzur as committer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2263: [BEAM-1726] TestFlinkRunner should assert PAssert s...

2017-03-17 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2263

[BEAM-1726] TestFlinkRunner should assert PAssert success/failure counters

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam flink-test-runner-aggregators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2263.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2263


commit 354bbcc2d87164cd5bb94b7c71d50df1d4c17c93
Author: Aviem Zur 
Date:   2017-03-17T11:36:55Z

[BEAM-1726] TestFlinkRunner should assert PAssert success/failure counters




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2261: Remove duplicate build-helper-maven-plugin

2017-03-16 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2261

Remove duplicate build-helper-maven-plugin

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
duplicate-helper-plugin-definition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2261


commit eb964e6fe973a2b2dc627f33598e0f9867c889e6
Author: Aviem Zur 
Date:   2017-03-16T21:46:23Z

Remove duplicate build-helper-maven-plugin




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2259: Fix shading opt out in io/google-cloud-platform

2017-03-16 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2259

Fix shading opt out in io/google-cloud-platform

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam fix-google-io-shading-opt-out

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2259.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2259


commit dd1b001728375aeecfc577de7ea13761aa37322c
Author: Aviem Zur 
Date:   2017-03-16T08:14:36Z

Fix shading opt out in io/google-cloud-platform




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2249: Increment shade-plugin version back to 3.0.0

2017-03-15 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2249

Increment shade-plugin version back to 3.0.0

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam increment-shade-plugin-version

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2249.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2249


commit d5e44e3517d302408fb064303f09d9668252afaa
Author: Aviem Zur 
Date:   2017-03-15T04:29:09Z

Increment shade-plugin version back to 3.0.0




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #178: Add IntelliJ files to .gitignore

2017-03-14 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam-site/pull/178

Add IntelliJ files to .gitignore



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam-site 
add-intellij-files-to-gitignore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #178


commit 1d2e719ee777dce2c88eaf1893222bf935722662
Author: Zur, Aviem 
Date:   2017-03-14T08:07:25Z

Add IntelliJ files to .gitignore




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #177: [BEAM-1652] Code style instructions in contribu...

2017-03-13 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam-site/pull/177

[BEAM-1652] Code style instructions in contribution guide



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam-site code-style-insturctions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/177.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #177


commit 2b4f3b239adc2cb272ce2ee7fae1b103b7ecfaaf
Author: Zur, Aviem 
Date:   2017-03-14T05:43:29Z

[BEAM-1652] Code style instructions for IntelliJ in contribution guide.

commit 82628b4d77b87295ed4c72444211e77573608046
Author: Zur, Aviem 
Date:   2017-03-14T06:25:34Z

[BEAM-1652] Code style instructions for Eclipse in contribution guide.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2234: [BEAM-1704] Added Create.TimestampedValues.withType

2017-03-13 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2234

[BEAM-1704] Added Create.TimestampedValues.withType

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam timestampedvalues-with-type

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2234






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2195: [BEAM-1651] Add IntelliJ code style xml to the proj...

2017-03-08 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2195

[BEAM-1651] Add IntelliJ code style xml to the project repository

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam intellij_codestyle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2195


commit 18ee561dde7b2f77095592a07c1ebcc6c4fd2d56
Author: Aviem Zur 
Date:   2017-03-08T11:59:03Z

[BEAM-1651] Add IntelliJ code style xml to the project repository




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2180: [BEAM-1636] UnboundedDataset action() does not mate...

2017-03-07 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2180

[BEAM-1636] UnboundedDataset action() does not materialize RDD

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
unbounded-dataset-materialize-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2180


commit a889597e748eb752141af8dc568c56449c4eba5c
Author: Aviem Zur 
Date:   2017-03-07T13:07:03Z

[BEAM-1636] UnboundedDataset action() does not materialize RDD




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2171: [BEAM-1629] Make metrics/aggregators accumulators a...

2017-03-06 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2171

[BEAM-1629] Make metrics/aggregators accumulators available on driver

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam 
fix-accumulator-singleton-instantiation-race

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2171


commit 5c4cb5ee153fd7ca069d90d3ac558bceaf82678c
Author: Aviem Zur 
Date:   2017-03-06T18:48:48Z

[BEAM-1629] Make metrics/aggregators accumulators available on driver




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2162: [BEAM-1397] Introduce IO metrics

2017-03-05 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2162

[BEAM-1397] Introduce IO metrics

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam introduce-io-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2162.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2162


commit 62c699d84d37ddbfbd40fe4e970890b8cdb9f0a9
Author: Aviem Zur 
Date:   2017-03-05T19:37:05Z

[BEAM-1397] Introduce IO metrics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2082: [BEAM-1397] [BEAM-1398] Introduce IO metrics. Add K...

2017-03-05 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2082


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2082: [BEAM-1397] [BEAM-1398] Introduce IO metrics. Add K...

2017-03-05 Thread aviemzur
Github user aviemzur closed the pull request at:

https://github.com/apache/beam/pull/2082


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2082: [BEAM-1397] [BEAM-1398] Introduce IO metrics. Add K...

2017-03-05 Thread aviemzur
GitHub user aviemzur reopened a pull request:

https://github.com/apache/beam/pull/2082

[BEAM-1397] [BEAM-1398] Introduce IO metrics. Add KafkaIO metrics.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam io-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2082


commit 31d94d4eecf492357c41424f65e1037fc3976d09
Author: Aviem Zur 
Date:   2017-02-22T14:18:13Z

[BEAM-1397] Introduce IO metrics

commit 8192724e65e65092117fee0a78408b476adf0245
Author: Aviem Zur 
Date:   2017-02-22T21:26:45Z

[BEAM-1398] KafkaIO metrics

commit 62d0ac450ff4631ddfd057a5caa785dae305065b
Author: Aviem Zur 
Date:   2017-02-23T04:56:43Z

Test Spark runner streaming IO metrics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2161: [BEAM-1625] BoundedDataset action() does not materi...

2017-03-05 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2161

[BEAM-1625] BoundedDataset action() does not materialize RDD

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam spark-materialize-bug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2161


commit f0cb2b77545680b44a6e424cea7f9738dafddbb2
Author: Aviem Zur 
Date:   2017-03-05T14:01:44Z

[BEAM-1625] BoundedDataset action() does not materialize RDD




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #2160: [BEAM-1623] Transform Reshuffle directly in Spark r...

2017-03-04 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/2160

[BEAM-1623] Transform Reshuffle directly in Spark runner

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam translate-reshuffle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2160.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2160


commit 6bc0241bb8d87f25f40ebef2682aab31b04a9fe9
Author: Aviem Zur 
Date:   2017-03-05T05:15:32Z

[BEAM-1623] Transform Reshuffle directly in Spark runner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam-site pull request #170: Fix possible erranous naming conventions refere...

2017-03-04 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam-site/pull/170

Fix possible erranous naming conventions reference



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/beam-site naming-conventions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #170


commit 110d4508c34654353936c05dc46c612c68428c3d
Author: Zur, Aviem 
Date:   2017-03-04T12:01:45Z

Fix possible erranous naming conventions reference




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >