[GitHub] incubator-beam pull request #1654: [BEAM-1177] Input DStream "bundles" shoul...

2016-12-18 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1654

[BEAM-1177] Input DStream "bundles" should be in serialized form and 
include relevant metadata.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam read-unbounded-bytes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1654.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1654


commit 975dec257364d68b5ada3bced7f139e88853722a
Author: Sela <ans...@paypal.com>
Date:   2016-12-18T12:36:53Z

SparkUnboundedSource mapWithStateDStream input data shuold be in serialized 
form for shuffle and
checkpointing.
Emit read count and watermark per microbatch.

commit 53bd915b8ccacf18b71da16a0a434013ef41
Author: Sela <ans...@paypal.com>
Date:   2016-12-18T13:16:23Z

Report the input global watermark for batch to the UI.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1500: [BEAM-1074] Set default-partitioner in So...

2016-12-18 Thread amitsela
Github user amitsela closed the pull request at:

https://github.com/apache/incubator-beam/pull/1500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [BEAM-362] Port runners to runners-core AggregatoryFactory

2016-12-16 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master d624d3b6b -> 5ebbd500c


[BEAM-362] Port runners to runners-core AggregatoryFactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55f04955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55f04955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55f04955

Branch: refs/heads/master
Commit: 55f0495583312c9c0dea620d6a4e85193e97f255
Parents: d624d3b
Author: Kenneth Knowles 
Authored: Thu Dec 15 21:06:14 2016 -0800
Committer: Sela 
Committed: Fri Dec 16 11:46:18 2016 +0200

--
 .../runners/apex/translation/operators/ApexParDoOperator.java   | 2 +-
 .../src/main/java/org/apache/beam/runners/core/DoFnRunners.java | 1 -
 .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 1 -
 .../java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java  | 1 -
 .../org/apache/beam/runners/direct/AggregatorContainer.java | 2 +-
 .../flink/translation/wrappers/streaming/DoFnOperator.java  | 3 ++-
 .../apache/beam/runners/spark/aggregators/SparkAggregators.java | 5 +++--
 7 files changed, 7 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 1e76949..4538fb5 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
 import 
org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import 
org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
+import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -46,7 +47,6 @@ import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index da16573..0e4bf75 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -22,7 +22,6 @@ import 
org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import 
org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 041cdde..d504b40 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -30,7 +30,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import 

[2/2] incubator-beam git commit: This closes #1644

2016-12-16 Thread amitsela
This closes #1644


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ebbd500
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ebbd500
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ebbd500

Branch: refs/heads/master
Commit: 5ebbd500c1548a6a53a5fbce8b0b3dd67d735d1e
Parents: d624d3b 55f0495
Author: Sela 
Authored: Fri Dec 16 11:47:06 2016 +0200
Committer: Sela 
Committed: Fri Dec 16 11:47:06 2016 +0200

--
 .../runners/apex/translation/operators/ApexParDoOperator.java   | 2 +-
 .../src/main/java/org/apache/beam/runners/core/DoFnRunners.java | 1 -
 .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 1 -
 .../java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java  | 1 -
 .../org/apache/beam/runners/direct/AggregatorContainer.java | 2 +-
 .../flink/translation/wrappers/streaming/DoFnOperator.java  | 3 ++-
 .../apache/beam/runners/spark/aggregators/SparkAggregators.java | 5 +++--
 7 files changed, 7 insertions(+), 8 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-932] Enable findbugs validation (and fix existing issues)

2016-12-15 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 1ad638e51 -> 4323247a3


[BEAM-932] Enable findbugs validation (and fix existing issues)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bba3700a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bba3700a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bba3700a

Branch: refs/heads/master
Commit: bba3700aebd9dcaf88402b6845008cf7e5950cfe
Parents: 1ad638e
Author: Ismaël Mejía 
Authored: Thu Dec 15 14:34:20 2016 +0100
Committer: Ismaël Mejía 
Committed: Thu Dec 15 14:34:20 2016 +0100

--
 runners/spark/pom.xml   | 10 
 .../metrics/WithNamedAggregatorsSupport.java|  4 +-
 .../spark/translation/WindowingHelpers.java |  3 +-
 .../src/main/resources/beam/findbugs-filter.xml | 49 +---
 4 files changed, 47 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e8fffa2..5a2fe87 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -146,7 +146,6 @@
 
   com.google.code.findbugs
   jsr305
-  1.3.9
 
 
   com.google.guava
@@ -317,15 +316,6 @@
   
 
   
-
-
-  org.codehaus.mojo
-  findbugs-maven-plugin
-  
-true
-  
-
-
 
   org.apache.maven.plugins
   maven-surefire-plugin

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
index 6932ae6..5e71280 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
@@ -121,8 +121,8 @@ public class WithNamedAggregatorsSupport extends 
MetricRegistry {
 final String parentName = entry.getKey();
 final Map gaugeMap = 
Maps.transformEntries(agg.renderAll(), toGauge());
 final Map fullNameGaugeMap = Maps.newLinkedHashMap();
-for (String shortName : gaugeMap.keySet()) {
-  fullNameGaugeMap.put(parentName + "." + shortName, 
gaugeMap.get(shortName));
+for (Map.Entry gaugeEntry : gaugeMap.entrySet()) {
+  fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), 
gaugeEntry.getValue());
 }
 return Maps.filterValues(fullNameGaugeMap, Predicates.notNull());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
index ec94f3e..0acff71 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import javax.annotation.Nonnull;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.api.java.function.Function;
 
@@ -84,7 +85,7 @@ public final class WindowingHelpers {
   public static  com.google.common.base.Function 
unwindowValueFunction() {
 return new com.google.common.base.Function() {
   @Override
-  public T apply(WindowedValue t) {
+  public T apply(@Nonnull WindowedValue t) {
 return t.getValue();
   }
 };

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
--
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml 
b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index a696aeb..bfb4988 100644
--- 

[2/2] incubator-beam git commit: This closes #1463

2016-12-15 Thread amitsela
This closes #1463


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4323247a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4323247a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4323247a

Branch: refs/heads/master
Commit: 4323247a3a8ea6fc06b99f66f0001f6956b494c9
Parents: 1ad638e bba3700
Author: Sela 
Authored: Thu Dec 15 16:16:57 2016 +0200
Committer: Sela 
Committed: Thu Dec 15 16:16:57 2016 +0200

--
 runners/spark/pom.xml   | 10 
 .../metrics/WithNamedAggregatorsSupport.java|  4 +-
 .../spark/translation/WindowingHelpers.java |  3 +-
 .../src/main/resources/beam/findbugs-filter.xml | 49 +---
 4 files changed, 47 insertions(+), 19 deletions(-)
--




[GitHub] incubator-beam pull request #1614: [BEAM-853] Force streaming execution on b...

2016-12-14 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1614

[BEAM-853] Force streaming execution on batch pipelines for testing.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-853

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1614


commit 7b7fed35cc3119ba11cca076e60e8cb1d13836a1
Author: Sela <ans...@paypal.com>
Date:   2016-12-14T10:20:08Z

Expose the adapted source.

commit e729db1a84c5c554a61b7a0a2db89fb0cf45f191
Author: Sela <ans...@paypal.com>
Date:   2016-12-14T10:20:42Z

Force streaming execution, if set in PipelineOptions.

commit e2f9419810a581e82b703d90b00db71e040f806a
Author: Sela <ans...@paypal.com>
Date:   2016-12-14T14:28:24Z

Added test.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1579: [BEAM-1130, BEAM-1133] Allow users to def...

2016-12-12 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1579

[BEAM-1130, BEAM-1133] Allow users to define max records per batch, update 
tests accordingly.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam 
readunbounded-minread-in-microbatch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1579


commit d5382819a3c9c3975ab5aac81cde01d9ba79da93
Author: Sela <ans...@paypal.com>
Date:   2016-12-12T10:37:32Z

[BEAM-1133] Add maxNumRecords per micro-batch for Spark runner options.

commit 84fc4da6484baf023015b4745dc9be12ec4d35ab
Author: Sela <ans...@paypal.com>
Date:   2016-12-12T10:41:34Z

[BEAM-1130] SparkRunner ResumeFromCheckpointStreamingTest Failing.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1578: [BEAM-757, BEAM-807]

2016-12-12 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1578

[BEAM-757, BEAM-807]

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam new-do-fn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1578.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1578


commit 5748fe9cfd9c36d92b2870acde5e071ea696ac78
Author: Sela <ans...@paypal.com>
Date:   2016-12-11T12:30:24Z

Use DoFnRunner in the implementation os FlatMapFunction.
Migrate to new DoFn.

commit 7f73c991426e0602d318a47cd9a38a3eebd979bf
Author: Sela <ans...@paypal.com>
Date:   2016-12-11T12:31:59Z

Implement AggregatorFactory for Spark runner, to be used by DoFnRunner.

commit b2b0d463b0cf2105248d02411a190f6406f26c69
Author: Sela <ans...@paypal.com>
Date:   2016-12-11T12:32:49Z

Migrate to new DoFn.

commit 00219617355e1d77d0bba6c272c0a2b3595eac61
Author: Sela <ans...@paypal.com>
Date:   2016-12-11T19:09:47Z

Add a custom AssignWindows implementation.

commit cd574d62825d636bb30ab0fd13172d2f8bb5cbb7
Author: Sela <ans...@paypal.com>
Date:   2016-12-12T09:33:58Z

Setup and teardown DoFn.

commit 9355b1ef4943b29e7d26735484aad2e63bb1d1eb
Author: Sela <ans...@paypal.com>
Date:   2016-12-12T09:34:17Z

Add implementation for GroupAlsoByWindow via flatMap.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [BEAM-921] spark-runner: register sources and coders to serialize with java serializer

2016-12-11 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master e841b1a21 -> bf8a3cb3a


[BEAM-921] spark-runner: register sources and coders to serialize with java 
serializer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aba40e2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aba40e2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aba40e2d

Branch: refs/heads/master
Commit: aba40e2de9ba058f33086eb6a913fa583a82b058
Parents: e841b1a
Author: Aviem Zur 
Authored: Thu Dec 8 15:07:06 2016 +0200
Committer: Sela 
Committed: Sun Dec 11 15:18:51 2016 +0200

--
 runners/spark/pom.xml   | 35 +---
 .../coders/BeamSparkRunnerRegistrator.java  | 60 +++-
 .../coders/BeamSparkRunnerRegistratorTest.java  | 57 +++
 3 files changed, 118 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index d1ef225..86e9039 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -54,7 +54,7 @@
 
 
 
-  
   local-runnable-on-service-tests
   false
@@ -134,28 +134,14 @@
   ${hadoop.version}
   provided
 
+
 
   com.esotericsoftware.kryo
   kryo
-  2.21
-  provided
-
-
-  de.javakaffee
-  kryo-serializers
-  0.39
-  
-
-
-  com.esotericsoftware
-  kryo
-
-
-
-  com.google.protobuf
-  protobuf-java
-
-  
+  2.21.1
 
 
   com.google.code.findbugs
@@ -264,6 +250,11 @@
   metrics-core
   ${dropwizard.metrics.version}
 
+
+  org.reflections
+  reflections
+  0.9.10
+
 
 
 
@@ -405,6 +396,10 @@
 com.google.thirdparty
 
org.apache.beam.spark.relocated.com.google.thirdparty
   
+  
+com.esotericsoftware.kryo
+
org.apache.beam.spark.relocated.com.esotericsoftware.kryo
+  
 
 true
 spark-app

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 0e62781..41b0a01 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -19,28 +19,60 @@
 package org.apache.beam.runners.spark.coders;
 
 import com.esotericsoftware.kryo.Kryo;
-import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
-import de.javakaffee.kryoserializers.guava.ReverseListSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Source;
 import org.apache.spark.serializer.KryoRegistrator;
+import org.reflections.Reflections;
 
 
 /**
- * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark 
runner needs.
+ * Custom {@link KryoRegistrator}s for Beam's Spark runner needs.
  */
 public class BeamSparkRunnerRegistrator implements KryoRegistrator {
 
   @Override
   public void registerClasses(Kryo kryo) {
-UnmodifiableCollectionsSerializer.registerSerializers(kryo);
-// Guava
-ImmutableListSerializer.registerSerializers(kryo);
-ImmutableSetSerializer.registerSerializers(kryo);
-ImmutableMapSerializer.registerSerializers(kryo);
-ImmutableMultimapSerializer.registerSerializers(kryo);
-ReverseListSerializer.registerSerializers(kryo);
+for (Class clazz : ClassesForJavaSerialization.getClasses()) {
+  kryo.register(clazz, new JavaSerializer());
+

[2/2] incubator-beam git commit: This closes #1552

2016-12-11 Thread amitsela
This closes #1552


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf8a3cb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf8a3cb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf8a3cb3

Branch: refs/heads/master
Commit: bf8a3cb3a5948bd1ef7f7e5cef230ecd4e8f1c84
Parents: e841b1a aba40e2
Author: Sela 
Authored: Sun Dec 11 15:19:24 2016 +0200
Committer: Sela 
Committed: Sun Dec 11 15:19:24 2016 +0200

--
 runners/spark/pom.xml   | 35 +---
 .../coders/BeamSparkRunnerRegistrator.java  | 60 +++-
 .../coders/BeamSparkRunnerRegistratorTest.java  | 57 +++
 3 files changed, 118 insertions(+), 34 deletions(-)
--




[GitHub] incubator-beam-site pull request #106: [BEAM-900] Spark quickstart instructi...

2016-12-10 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam-site/pull/106

[BEAM-900] Spark quickstart instructions.

R: @davorbonaci 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam-site BEAM-900

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam-site/pull/106.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #106


commit c10d2c52084dd01f03e6df86e4d2ab47ccab4e36
Author: Sela <ans...@paypal.com>
Date:   2016-12-10T20:43:24Z

fixup! wrong generated directory name.

commit 8dcc50e023f9070a60de72afc80db42bbff0b654
Author: Sela <ans...@paypal.com>
Date:   2016-12-10T20:43:49Z

[BEAM-900] Spark quickstart instructions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1561

2016-12-09 Thread amitsela
This closes #1561


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9a63117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9a63117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9a63117

Branch: refs/heads/master
Commit: d9a6311734064b1c7171b943eeb511c4d648187a
Parents: 63d197c e48b0e6
Author: Sela 
Authored: Fri Dec 9 18:01:12 2016 +0200
Committer: Sela 
Committed: Fri Dec 9 18:01:12 2016 +0200

--
 .../game/utils/WriteWindowedToBigQuery.java|  5 +
 .../runners/dataflow/internal/AssignWindows.java   |  8 
 .../dataflow/DataflowPipelineTranslatorTest.java   | 17 +
 3 files changed, 14 insertions(+), 16 deletions(-)
--




[1/2] incubator-beam git commit: Remove misc uses of OldDoFn

2016-12-09 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 63d197cd0 -> d9a631173


Remove misc uses of OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e48b0e6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e48b0e6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e48b0e6b

Branch: refs/heads/master
Commit: e48b0e6bc20d8eba2968decf7ac2b4ee7503a4df
Parents: 63d197c
Author: Kenneth Knowles 
Authored: Thu Dec 8 23:33:40 2016 -0800
Committer: Sela 
Committed: Fri Dec 9 18:00:39 2016 +0200

--
 .../game/utils/WriteWindowedToBigQuery.java|  5 +
 .../runners/dataflow/internal/AssignWindows.java   |  8 
 .../dataflow/DataflowPipelineTranslatorTest.java   | 17 +
 3 files changed, 14 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e48b0e6b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
--
diff --git 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index c32289f..7a4fb2c 100644
--- 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -23,7 +23,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
@@ -43,9 +42,7 @@ public class WriteWindowedToBigQuery
   }
 
   /** Convert each key/score pair into a BigQuery TableRow. */
-  protected class BuildRowFn extends DoFn
-  implements RequiresWindowAccess {
-
+  protected class BuildRowFn extends DoFn {
 @ProcessElement
 public void processElement(ProcessContext c, BoundedWindow window) {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e48b0e6b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 68ee7bc..27fe13d 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.dataflow.internal;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -63,9 +63,9 @@ public class AssignWindows extends 
PTransform
 } else {
   // If the windowFn didn't change, we just run a pass-through transform 
and then set the
   // new windowing strategy.
-  return input.apply("Identity", ParDo.of(new OldDoFn() {
-@Override
-public void processElement(OldDoFn.ProcessContext c) throws 
Exception {
+  return input.apply("Identity", ParDo.of(new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext c) throws Exception {
   c.output(c.element());
 }
   })).setWindowingStrategyInternal(outputStrategy);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e48b0e6b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index ac4f2df..8d0b83a 100644

[GitHub] incubator-beam pull request #1553: [BEAM-1111] Reject timers for ParDo in Sp...

2016-12-08 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1553

[BEAM-] Reject timers for ParDo in SparkRunner streaming evaluators.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1553


commit 0c948c5dce65b9651c576f34f17922184816a703
Author: Sela <ans...@paypal.com>
Date:   2016-12-08T18:29:35Z

[BEAM-] Reject timers for ParDo in SparkRunner streaming evaluators.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam-site pull request #103: [BEAM-507] Fill in the documentation/...

2016-12-08 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam-site/pull/103

[BEAM-507] Fill in the documentation/runners/spark portion of the web…

…site.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam-site BEAM-507

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam-site/pull/103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #103


commit 5e5caf73c848797eb7e5bc757d1f64cd8c517de8
Author: Sela <ans...@paypal.com>
Date:   2016-12-08T13:34:54Z

[BEAM-507] Fill in the documentation/runners/spark portion of the website.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1532: [BEAM-329] Spark runner README should hav...

2016-12-07 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1532

[BEAM-329] Spark runner README should have a proper batch example.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-329

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1532.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1532


commit c50fe89a071f59740b6f5bd90e1984ca3159162f
Author: Sela <ans...@paypal.com>
Date:   2016-12-07T09:20:07Z

[BEAM-329] Spark runner README should have a proper batch example.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1531

2016-12-07 Thread amitsela
This closes #1531


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ccf6dbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ccf6dbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ccf6dbe

Branch: refs/heads/master
Commit: 9ccf6dbea0d3807fef6a7c0432906fffd2b8ec3f
Parents: b41a46e baf5e6b
Author: Sela 
Authored: Wed Dec 7 10:31:38 2016 +0200
Committer: Sela 
Committed: Wed Dec 7 10:31:38 2016 +0200

--
 runners/spark/pom.xml | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: [BEAM-1094] Set test scope for Kafka IO and junit

2016-12-07 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master b41a46e86 -> 9ccf6dbea


[BEAM-1094] Set test scope for Kafka IO and junit


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/baf5e6bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/baf5e6bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/baf5e6bd

Branch: refs/heads/master
Commit: baf5e6bd9b1011f4c5c3974aa46393471b340c15
Parents: b41a46e
Author: Jean-Baptiste Onofré 
Authored: Wed Dec 7 08:37:33 2016 +0100
Committer: Sela 
Committed: Wed Dec 7 10:30:44 2016 +0200

--
 runners/spark/pom.xml | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/baf5e6bd/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e34af15..9a3adf6 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -268,18 +268,20 @@
 
   org.apache.beam
   beam-sdks-java-io-kafka
+  test
 
 
   org.apache.kafka
   kafka-clients
   0.9.0.1
+  test
 
 
 
 
   junit
   junit
-  provided
+  test
   
 
   hamcrest-core



[3/3] incubator-beam git commit: This closes #1466

2016-12-05 Thread amitsela
This closes #1466


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6893a727
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6893a727
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6893a727

Branch: refs/heads/master
Commit: 6893a7270b728ec72c6e8749008e6a583edf5921
Parents: ef9871c 158378f
Author: Sela 
Authored: Mon Dec 5 12:57:25 2016 +0200
Committer: Sela 
Committed: Mon Dec 5 12:57:25 2016 +0200

--
 .../beam/runners/spark/EvaluationResult.java|  67 --
 .../beam/runners/spark/SparkPipelineResult.java | 193 +
 .../apache/beam/runners/spark/SparkRunner.java  | 113 ++
 .../beam/runners/spark/TestSparkRunner.java |  11 +-
 .../spark/aggregators/AccumulatorSingleton.java |   6 +-
 .../spark/aggregators/SparkAggregators.java |  97 +
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../spark/translation/EvaluationContext.java| 131 ++-
 .../spark/translation/SparkContextFactory.java  |   2 +-
 .../spark/translation/SparkRuntimeContext.java  |  62 +-
 .../spark/translation/TransformTranslator.java  |  10 +-
 .../streaming/StreamingTransformTranslator.java |  10 +-
 .../runners/spark/ProvidedSparkContextTest.java |   6 +-
 .../runners/spark/SparkPipelineStateTest.java   | 217 +++
 .../spark/aggregators/ClearAggregatorsRule.java |  37 
 .../metrics/sink/ClearAggregatorsRule.java  |  33 ---
 .../metrics/sink/NamedAggregatorsTest.java  |   3 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../beam/runners/spark/io/NumShardsTest.java|   2 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   2 +-
 .../spark/translation/SideEffectsTest.java  |  59 -
 .../streaming/EmptyStreamAssertionTest.java |   4 +
 .../ResumeFromCheckpointStreamingTest.java  |  15 +-
 .../streaming/utils/PAssertStreaming.java   |   9 +-
 24 files changed, 680 insertions(+), 413 deletions(-)
--




[1/3] incubator-beam git commit: [BEAM-1000, BEAM-1050] Fixed PipelineResult.State Failed for streaming, support non-blocking cancel/waituntilfinish in batch. Added a SparkPipelineResult class to addr

2016-12-05 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master ef9871c36 -> 6893a7270


[BEAM-1000, BEAM-1050] Fixed PipelineResult.State Failed for streaming, support 
non-blocking
cancel/waituntilfinish in batch.
Added a SparkPipelineResult class to address PipelineResult#waitUntilFinish() 
and SparkRunner#run() semantics.

* Simplified beamExceptionFrom() to abstract away SparkExceptions.
* Reordered methods according to access level.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1a67934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1a67934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1a67934

Branch: refs/heads/master
Commit: b1a67934d1496e221718599301635c38f8e3b7ec
Parents: ef9871c
Author: Stas Levin 
Authored: Mon Nov 28 11:11:10 2016 +0200
Committer: Sela 
Committed: Mon Dec 5 12:56:39 2016 +0200

--
 .../beam/runners/spark/EvaluationResult.java|  67 --
 .../beam/runners/spark/SparkPipelineResult.java | 179 +++
 .../apache/beam/runners/spark/SparkRunner.java  |  98 +
 .../beam/runners/spark/TestSparkRunner.java |  10 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../spark/translation/EvaluationContext.java| 119 ++
 .../spark/translation/SparkContextFactory.java  |   2 +-
 .../runners/spark/ProvidedSparkContextTest.java |   6 +-
 .../runners/spark/SparkPipelineStateTest.java   | 219 +++
 .../metrics/sink/ClearAggregatorsRule.java  |   2 +-
 .../metrics/sink/NamedAggregatorsTest.java  |   2 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../beam/runners/spark/io/NumShardsTest.java|   2 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   2 +-
 .../spark/translation/SideEffectsTest.java  |  59 -
 .../streaming/EmptyStreamAssertionTest.java |   4 +
 .../ResumeFromCheckpointStreamingTest.java  |   8 +-
 .../streaming/utils/PAssertStreaming.java   |   9 +-
 18 files changed, 500 insertions(+), 292 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
deleted file mode 100644
index 52606a3..000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Interface for retrieving the result(s) of running a pipeline. Allows us to 
translate between
- * {@code PObject}s or {@code PCollection}s and Ts or collections of Ts.
- */
-public interface EvaluationResult extends PipelineResult {
-  /**
-   * Retrieves an iterable of results associated with the PCollection passed 
in.
-   *
-   * @param pcollection Collection we wish to translate.
-   * @param  Type of elements contained in collection.
-   * @return Natively types result associated with collection.
-   */
-   Iterable get(PCollection pcollection);
-
-  /**
-   * Retrieve an object of Type T associated with the PValue passed in.
-   *
-   * @param pval PValue to retrieve associated data for.
-   * @param   Type of object to return.
-   * @return Native object.
-   */
-   T get(PValue pval);
-
-  /**
-   * Retrieves the final value of the aggregator.
-   *
-   * @param aggNamename of aggregator.
-   * @param resultType Class of final result of aggregation.
-   * @param Type of final result of aggregation.
-   * @return Result of aggregation associated with specified name.
-   */
-   T 

[2/3] incubator-beam git commit: Redistributed some responsibilities in order to remove getAggregatorValues() form EvaluationContext.

2016-12-05 Thread amitsela
Redistributed some responsibilities in order to remove getAggregatorValues() 
form EvaluationContext.

Inferred excepted exception handling according to existing codebase and tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/158378f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/158378f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/158378f0

Branch: refs/heads/master
Commit: 158378f0f682b80462b917002b895ddbf782d06d
Parents: b1a6793
Author: Stas Levin 
Authored: Sat Dec 3 00:47:39 2016 +0200
Committer: Sela 
Committed: Mon Dec 5 12:56:41 2016 +0200

--
 .../beam/runners/spark/SparkPipelineResult.java | 76 ---
 .../apache/beam/runners/spark/SparkRunner.java  | 35 +--
 .../beam/runners/spark/TestSparkRunner.java |  1 +
 .../spark/aggregators/AccumulatorSingleton.java |  6 +-
 .../spark/aggregators/SparkAggregators.java | 97 
 .../spark/translation/EvaluationContext.java| 20 +---
 .../spark/translation/SparkRuntimeContext.java  | 62 +
 .../spark/translation/TransformTranslator.java  | 10 +-
 .../streaming/StreamingTransformTranslator.java | 10 +-
 .../runners/spark/SparkPipelineStateTest.java   | 36 
 .../spark/aggregators/ClearAggregatorsRule.java | 37 
 .../metrics/sink/ClearAggregatorsRule.java  | 33 ---
 .../metrics/sink/NamedAggregatorsTest.java  |  1 +
 .../streaming/EmptyStreamAssertionTest.java |  2 +-
 .../ResumeFromCheckpointStreamingTest.java  |  9 +-
 15 files changed, 247 insertions(+), 188 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/158378f0/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index ec0610c..b1027a6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -23,7 +23,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -31,7 +31,10 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.joda.time.Duration;
 
 /**
@@ -40,29 +43,37 @@ import org.joda.time.Duration;
 public abstract class SparkPipelineResult implements PipelineResult {
 
   protected final Future pipelineExecution;
-  protected final EvaluationContext context;
+  protected JavaSparkContext javaSparkContext;
 
   protected PipelineResult.State state;
 
   SparkPipelineResult(final Future pipelineExecution,
-  final EvaluationContext evaluationContext) {
+  final JavaSparkContext javaSparkContext) {
 this.pipelineExecution = pipelineExecution;
-this.context = evaluationContext;
+this.javaSparkContext = javaSparkContext;
 // pipelineExecution is expected to have started executing eagerly.
 state = State.RUNNING;
   }
 
-  private RuntimeException runtimeExceptionFrom(Throwable e) {
+  private RuntimeException runtimeExceptionFrom(final Throwable e) {
 return (e instanceof RuntimeException) ? (RuntimeException) e : new 
RuntimeException(e);
   }
 
-  private RuntimeException beamExceptionFrom(Throwable e) {
+  private RuntimeException beamExceptionFrom(final Throwable e) {
 // Scala doesn't declare checked exceptions in the bytecode, and the Java 
compiler
 // won't let you catch something that is not declared, so we can't catch
 // SparkException directly, instead we do an instanceof check.
-return (e instanceof SparkException)
-? new Pipeline.PipelineExecutionException(e.getCause() != null ? 
e.getCause() : e)
-: runtimeExceptionFrom(e);
+
+if (e instanceof SparkException) {
+  if (e.getCause() != 

[GitHub] incubator-beam pull request #1500: [BEAM-1074] Set default-partitioner in So...

2016-12-03 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1500

[BEAM-1074] Set default-partitioner in SourceRDD.Unbounded.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-1074

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1500


commit 2cb3c0c2fc6e09e670f21561da5c83083a7308ad
Author: Sela <ans...@paypal.com>
Date:   2016-12-02T17:08:33Z

[BEAM-1074] Set default-partitioner in SourceRDD.Unbounded.

commit dcad79534f4bf3262bb872d12c7315e583836df8
Author: Sela <ans...@paypal.com>
Date:   2016-12-03T16:38:02Z

Make SourceRDD partitions number equal the HashPartitioner, which might 
create empty partitions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1450

2016-11-29 Thread amitsela
This closes #1450


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a8b9b52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a8b9b52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a8b9b52

Branch: refs/heads/master
Commit: 3a8b9b5212972f0128099251884473d06758e2aa
Parents: 3f16f26 a1a4ac0
Author: Sela 
Authored: Tue Nov 29 11:50:51 2016 +0200
Committer: Sela 
Committed: Tue Nov 29 11:50:51 2016 +0200

--
 .../beam/runners/spark/io/MicrobatchSource.java | 20 ++--
 .../beam/runners/spark/io/SourceDStream.java|  3 ++-
 .../spark/stateful/StateSpecFunctions.java  |  2 +-
 3 files changed, 17 insertions(+), 8 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-1052] Add InputDStream id to MicrobatchSource hashcode.

2016-11-29 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 3f16f2660 -> 3a8b9b521


[BEAM-1052] Add InputDStream id to MicrobatchSource hashcode.

Done to avoid collisions between splits of different sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1a4ac0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1a4ac0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1a4ac0f

Branch: refs/heads/master
Commit: a1a4ac0fc0376aa4c43a4357f3acc930e2b53c94
Parents: 3f16f26
Author: Aviem Zur 
Authored: Tue Nov 29 09:51:12 2016 +0200
Committer: Sela 
Committed: Tue Nov 29 11:49:31 2016 +0200

--
 .../beam/runners/spark/io/MicrobatchSource.java | 20 ++--
 .../beam/runners/spark/io/SourceDStream.java|  3 ++-
 .../spark/stateful/StateSpecFunctions.java  |  2 +-
 3 files changed, 17 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 4a174aa..5656375 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -54,6 +54,7 @@ public class MicrobatchSource(splits.get(i), maxReadTime, 1, 
numRecords[i], i));
+  result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, 
numRecords[i], i, sourceId));
 }
 return result;
   }
@@ -137,8 +140,8 @@ public class MicrobatchSource>> compute(Time 
validTime) {
 MicrobatchSource microbatchSource = new 
MicrobatchSource<>(
-unboundedSource, boundReadDuration, initialParallelism, 
rateControlledMaxRecords(), -1);
+

[1/2] incubator-beam git commit: [BEAM-851] Determine if the pipeline must be translated into streaming mode (if not set)

2016-11-27 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 3ad767750 -> 8cc43aa70


[BEAM-851] Determine if the pipeline must be translated into streaming mode (if 
not set)

Now an Evaluator (visitor) detects if there are Unbonded.Read transforms.
This approach is based on Flink's PipelineTranslationOptimizer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc96b138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc96b138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc96b138

Branch: refs/heads/master
Commit: cc96b1381b6db849adf69daddecf30b9c61acf73
Parents: 3ad7677
Author: Ismaël Mejía 
Authored: Fri Nov 25 14:52:26 2016 +0100
Committer: Ismaël Mejía 
Committed: Sun Nov 27 11:18:12 2016 +0100

--
 .../apache/beam/runners/spark/SparkRunner.java  | 61 +++-
 .../streaming/StreamingTransformTranslator.java |  2 +-
 .../streaming/EmptyStreamAssertionTest.java |  2 +
 .../streaming/FlattenStreamingTest.java |  2 +
 .../streaming/SimpleStreamingWordCountTest.java |  1 +
 .../SparkTestPipelineOptionsForStreaming.java   |  6 --
 6 files changed, 65 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index e800071..49e0113 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
@@ -120,12 +121,12 @@ public final class SparkRunner extends 
PipelineRunner {
 mOptions = options;
   }
 
-
   @Override
   public EvaluationResult run(Pipeline pipeline) {
 try {
   LOG.info("Executing pipeline using the SparkRunner.");
 
+  detectTranslationMode(pipeline);
   if (mOptions.isStreaming()) {
 SparkRunnerStreamingContextFactory contextFactory =
 new SparkRunnerStreamingContextFactory(pipeline, mOptions);
@@ -136,7 +137,7 @@ public final class SparkRunner extends 
PipelineRunner {
 jssc.start();
 
 // if recovering from checkpoint, we have to reconstruct the 
EvaluationResult instance.
-return contextFactory.getCtxt() == null ? new 
EvaluationContext(jssc.sc(),
+return contextFactory.getCtxt() == null ? new 
EvaluationContext(jssc.sparkContext(),
 pipeline, jssc) : contextFactory.getCtxt();
   } else {
 JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
@@ -168,6 +169,62 @@ public final class SparkRunner extends 
PipelineRunner {
   }
 
   /**
+   * Detect the translation mode for the pipeline and change options in case 
streaming
+   * translation is needed.
+   * @param pipeline
+   */
+  private void detectTranslationMode(Pipeline pipeline) {
+TranslationModeDetector detector = new TranslationModeDetector();
+pipeline.traverseTopologically(detector);
+if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
+  // set streaming mode if it's a streaming pipeline
+  this.mOptions.setStreaming(true);
+}
+  }
+
+  /**
+   * The translation mode of the Beam Pipeline.
+   */
+  enum TranslationMode {
+/** Uses the batch mode. */
+BATCH,
+/** Uses the streaming mode. */
+STREAMING
+  }
+
+  /**
+   * Traverses the Pipeline to determine the {@link TranslationMode} for this 
pipeline.
+   */
+  static class TranslationModeDetector extends 
Pipeline.PipelineVisitor.Defaults {
+private static final Logger LOG = 
LoggerFactory.getLogger(TranslationModeDetector.class);
+
+private TranslationMode translationMode;
+
+TranslationModeDetector(TranslationMode defaultMode) {
+  this.translationMode = defaultMode;
+}
+
+TranslationModeDetector() {
+  this(TranslationMode.BATCH);
+}
+
+TranslationMode getTranslationMode() {
+  return translationMode;
+}
+
+@Override
+public void visitPrimitiveTransform(TransformTreeNode node) {
+  

[2/2] incubator-beam git commit: This closes #1436

2016-11-27 Thread amitsela
This closes #1436


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cc43aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cc43aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cc43aa7

Branch: refs/heads/master
Commit: 8cc43aa701807009ec826e752b2f1bb95442450f
Parents: 3ad7677 cc96b13
Author: Sela 
Authored: Sun Nov 27 13:19:59 2016 +0200
Committer: Sela 
Committed: Sun Nov 27 13:19:59 2016 +0200

--
 .../apache/beam/runners/spark/SparkRunner.java  | 61 +++-
 .../streaming/StreamingTransformTranslator.java |  2 +-
 .../streaming/EmptyStreamAssertionTest.java |  2 +
 .../streaming/FlattenStreamingTest.java |  2 +
 .../streaming/SimpleStreamingWordCountTest.java |  1 +
 .../SparkTestPipelineOptionsForStreaming.java   |  6 --
 6 files changed, 65 insertions(+), 9 deletions(-)
--




[1/3] incubator-beam git commit: Remove unused WindowingInternals.writePCollectionViewData

2016-11-26 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 632576b5b -> 3ad767750


Remove unused WindowingInternals.writePCollectionViewData


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/803bbe2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/803bbe2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/803bbe2a

Branch: refs/heads/master
Commit: 803bbe2a3026424f509e13809a8eecb79990e5fe
Parents: 07544ef
Author: Kenneth Knowles 
Authored: Wed Nov 23 11:23:07 2016 -0800
Committer: Sela 
Committed: Sat Nov 26 12:47:14 2016 +0200

--
 .../operators/ApexGroupByKeyOperator.java | 10 --
 .../beam/runners/core/SimpleDoFnRunner.java   | 18 --
 .../beam/runners/core/SimpleOldDoFnRunner.java| 16 
 .../functions/FlinkProcessContextBase.java|  8 
 .../spark/translation/SparkProcessContext.java|  9 -
 .../apache/beam/sdk/transforms/DoFnTester.java| 10 --
 .../apache/beam/sdk/util/WindowingInternals.java  | 10 --
 7 files changed, 81 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index eca4308..3b0e4f2 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -30,8 +30,6 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,7 +38,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import 
org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
@@ -392,13 +389,6 @@ public class ApexGroupByKeyOperator implements 
Operator {
 }
 
 @Override
-public  void writePCollectionViewData(
-TupleTag tag, Iterable data, Coder 
elemCoder)
-throws IOException {
-  throw new RuntimeException("writePCollectionViewData() not available 
in Streaming mode.");
-}
-
-@Override
 public  T sideInput(PCollectionView view, BoundedWindow 
mainInputWindow) {
   throw new RuntimeException("sideInput() is not available in 
Streaming mode.");
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 841e412..f611c0a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -21,14 +21,11 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
@@ -595,21 +592,6 @@ public class SimpleDoFnRunner implements 
DoFnRunner data, Coder 
elemCoder)
-throws IOException {
-  @SuppressWarnings("unchecked")
-  Coder windowCoder = (Coder) 

[3/3] incubator-beam git commit: [BEAM-498] Remove obsolete WindowingInternals#writePCollectionViewData This closes #1430

2016-11-26 Thread amitsela
[BEAM-498] Remove obsolete WindowingInternals#writePCollectionViewData
This closes #1430


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ad76775
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ad76775
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ad76775

Branch: refs/heads/master
Commit: 3ad7677503977108b5a67c315bd1cc6ead3ee998
Parents: 632576b 803bbe2
Author: Sela 
Authored: Sat Nov 26 12:50:01 2016 +0200
Committer: Sela 
Committed: Sat Nov 26 12:50:01 2016 +0200

--
 .../operators/ApexGroupByKeyOperator.java | 10 --
 .../beam/runners/core/SimpleDoFnRunner.java   | 18 --
 .../beam/runners/core/SimpleOldDoFnRunner.java| 16 
 .../functions/FlinkProcessContextBase.java|  8 
 .../beam/runners/dataflow/DataflowRunner.java | 14 +-
 .../spark/translation/SparkProcessContext.java|  9 -
 .../apache/beam/sdk/transforms/DoFnTester.java| 10 --
 .../apache/beam/sdk/util/WindowingInternals.java  | 10 --
 8 files changed, 5 insertions(+), 90 deletions(-)
--




[2/3] incubator-beam git commit: Remove unused body of StreamingPCollectionViewWriterFn

2016-11-26 Thread amitsela
Remove unused body of StreamingPCollectionViewWriterFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/07544ef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/07544ef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/07544ef3

Branch: refs/heads/master
Commit: 07544ef3a47bbdfacc00c75af875c3533a5fe477
Parents: 632576b
Author: Kenneth Knowles 
Authored: Wed Nov 23 11:22:08 2016 -0800
Committer: Sela 
Committed: Sat Nov 26 12:47:14 2016 +0200

--
 .../apache/beam/runners/dataflow/DataflowRunner.java  | 14 +-
 1 file changed, 5 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07544ef3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 36328e9..f1d41f2 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2360,8 +2360,8 @@ public class DataflowRunner extends 
PipelineRunner {
   }
 
   /**
-   * A specialized {@link DoFn} for writing the contents of a {@link 
PCollection}
-   * to a streaming {@link PCollectionView} backend implementation.
+   * A marker {@link DoFn} for writing the contents of a {@link PCollection} 
to a streaming
+   * {@link PCollectionView} backend implementation.
*/
   @Deprecated
   public static class StreamingPCollectionViewWriterFn
@@ -2389,13 +2389,9 @@ public class DataflowRunner extends 
PipelineRunner {
 
 @Override
 public void processElement(ProcessContext c) throws Exception {
-  List output = new ArrayList<>();
-  for (T elem : c.element()) {
-output.add(WindowedValue.of(elem, c.timestamp(), c.window(), 
c.pane()));
-  }
-
-  c.windowingInternals().writePCollectionViewData(
-  view.getTagInternal(), output, dataCoder);
+  throw new UnsupportedOperationException(
+  String.format(
+  "%s is a marker class only and should never be executed.", 
getClass().getName()));
 }
   }
 



[2/2] incubator-beam git commit: This closes #1393

2016-11-20 Thread amitsela
This closes #1393


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/875631f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/875631f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/875631f0

Branch: refs/heads/master
Commit: 875631f07b1e4556afec28dc850bd7fe2d07444b
Parents: d93e9a8 dafd5be
Author: Sela 
Authored: Sun Nov 20 19:26:52 2016 +0200
Committer: Sela 
Committed: Sun Nov 20 19:26:52 2016 +0200

--
 .../runners/spark/SparkPipelineOptions.java |  6 --
 .../apache/beam/runners/spark/SparkRunner.java  |  5 +-
 .../spark/translation/EvaluationContext.java| 59 +---
 .../SparkRunnerStreamingContextFactory.java |  3 +-
 .../streaming/EmptyStreamAssertionTest.java |  3 +-
 .../streaming/FlattenStreamingTest.java |  4 +-
 .../streaming/KafkaStreamingTest.java   | 11 ++--
 .../ResumeFromCheckpointStreamingTest.java  |  3 +-
 .../streaming/SimpleStreamingWordCountTest.java |  4 +-
 .../streaming/utils/PAssertStreaming.java   |  8 ++-
 .../SparkTestPipelineOptionsForStreaming.java   |  1 -
 11 files changed, 60 insertions(+), 47 deletions(-)
--




[1/2] incubator-beam git commit: [Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming applications. remove timeout parameer in spark pipeline option.

2016-11-20 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master d93e9a88b -> 875631f07


[Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming 
applications.
remove timeout parameer in spark pipeline option.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafd5be7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafd5be7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafd5be7

Branch: refs/heads/master
Commit: dafd5be7f69f191fc9edb8b9f9aec010ca368f50
Parents: d93e9a8
Author: ksalant 
Authored: Sun Nov 20 11:57:16 2016 +0200
Committer: Sela 
Committed: Sun Nov 20 19:25:52 2016 +0200

--
 .../runners/spark/SparkPipelineOptions.java |  6 --
 .../apache/beam/runners/spark/SparkRunner.java  |  5 +-
 .../spark/translation/EvaluationContext.java| 59 +---
 .../SparkRunnerStreamingContextFactory.java |  3 +-
 .../streaming/EmptyStreamAssertionTest.java |  3 +-
 .../streaming/FlattenStreamingTest.java |  4 +-
 .../streaming/KafkaStreamingTest.java   | 11 ++--
 .../ResumeFromCheckpointStreamingTest.java  |  3 +-
 .../streaming/SimpleStreamingWordCountTest.java |  4 +-
 .../streaming/utils/PAssertStreaming.java   |  8 ++-
 .../SparkTestPipelineOptionsForStreaming.java   |  1 -
 11 files changed, 60 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index b1ebde9..0fd790e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -39,12 +39,6 @@ public interface SparkPipelineOptions
   String getSparkMaster();
   void setSparkMaster(String master);
 
-  @Description("Timeout to wait (in msec) for a streaming execution to stop, 
-1 runs until "
-  + "execution is stopped")
-  @Default.Long(-1)
-  Long getTimeout();
-  void setTimeout(Long timeoutMillis);
-
   @Description("Batch interval for Spark streaming in milliseconds.")
   @Default.Long(1000)
   Long getBatchIntervalMillis();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 6bbef39..e800071 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -137,11 +137,8 @@ public final class SparkRunner extends 
PipelineRunner {
 
 // if recovering from checkpoint, we have to reconstruct the 
EvaluationResult instance.
 return contextFactory.getCtxt() == null ? new 
EvaluationContext(jssc.sc(),
-pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
+pipeline, jssc) : contextFactory.getCtxt();
   } else {
-if (mOptions.getTimeout() > 0) {
-  LOG.info("Timeout is ignored by the SparkRunner in batch.");
-}
 JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
 EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
 SparkPipelineTranslator translator = new 
TransformTranslator.Translator();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index aaf7573..1183fbb 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.StreamingContextState;
 import 

[1/2] incubator-beam git commit: [BEAM-983] Fix a bunch of precommit errors from #1332

2016-11-15 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 201110222 -> dbbd5e448


[BEAM-983] Fix a bunch of precommit errors from #1332

Renames TestPipelineOptions to SparkTestPipelineOptions

To avoid confusion with sdk.testing.TestPipelineOptions.
Also, a couple of other minor fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd740ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd740ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd740ee1

Branch: refs/heads/master
Commit: dd740ee1b20ab6921db3620ac28499dc66511482
Parents: 2011102
Author: Eugene Kirpichov 
Authored: Tue Nov 15 14:25:51 2016 -0800
Committer: Sela 
Committed: Wed Nov 16 01:49:59 2016 +0200

--
 .../runners/spark/ProvidedSparkContextTest.java |  2 -
 .../metrics/sink/NamedAggregatorsTest.java  |  4 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  5 +--
 .../beam/runners/spark/io/NumShardsTest.java|  5 +--
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  5 +--
 .../spark/translation/SideEffectsTest.java  | 34 ++-
 .../streaming/EmptyStreamAssertionTest.java |  5 ++-
 .../streaming/FlattenStreamingTest.java |  5 ++-
 .../streaming/KafkaStreamingTest.java   |  5 ++-
 .../ResumeFromCheckpointStreamingTest.java  |  5 ++-
 .../streaming/SimpleStreamingWordCountTest.java |  5 ++-
 .../utils/SparkTestPipelineOptions.java | 42 +++
 .../SparkTestPipelineOptionsForStreaming.java   | 44 
 .../streaming/utils/TestPipelineOptions.java| 25 ---
 .../utils/TestPipelineOptionsForStreaming.java  | 44 
 15 files changed, 121 insertions(+), 114 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index bc337c7..fe73aba 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.examples.WordCount;
-import 
org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.Rule;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index c220f2b..c16574c 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.examples.WordCount;
-import 
org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
@@ -52,7 +52,7 @@ public class NamedAggregatorsTest {
   public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
 
   @Rule
-  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+  public final SparkTestPipelineOptions pipelineOptions = new 
SparkTestPipelineOptions();
 
   private Pipeline createSparkPipeline() {
 SparkPipelineOptions options = pipelineOptions.getOptions();


[2/2] incubator-beam git commit: This closes #1364

2016-11-15 Thread amitsela
This closes #1364


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dbbd5e44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dbbd5e44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dbbd5e44

Branch: refs/heads/master
Commit: dbbd5e44800586c3f1a2efc58b91d79126c843d3
Parents: 2011102 dd740ee
Author: Sela 
Authored: Wed Nov 16 01:51:51 2016 +0200
Committer: Sela 
Committed: Wed Nov 16 01:51:51 2016 +0200

--
 .../runners/spark/ProvidedSparkContextTest.java |  2 -
 .../metrics/sink/NamedAggregatorsTest.java  |  4 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  5 +--
 .../beam/runners/spark/io/NumShardsTest.java|  5 +--
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  5 +--
 .../spark/translation/SideEffectsTest.java  | 34 ++-
 .../streaming/EmptyStreamAssertionTest.java |  5 ++-
 .../streaming/FlattenStreamingTest.java |  5 ++-
 .../streaming/KafkaStreamingTest.java   |  5 ++-
 .../ResumeFromCheckpointStreamingTest.java  |  5 ++-
 .../streaming/SimpleStreamingWordCountTest.java |  5 ++-
 .../utils/SparkTestPipelineOptions.java | 42 +++
 .../SparkTestPipelineOptionsForStreaming.java   | 44 
 .../streaming/utils/TestPipelineOptions.java| 25 ---
 .../utils/TestPipelineOptionsForStreaming.java  | 44 
 15 files changed, 121 insertions(+), 114 deletions(-)
--




[GitHub] incubator-beam pull request #1362: [BEAM-983] runners/spark/translation/stre...

2016-11-15 Thread amitsela
Github user amitsela closed the pull request at:

https://github.com/apache/incubator-beam/pull/1362


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1362: [BEAM-983] add missing license.

2016-11-15 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1362

[BEAM-983] add missing license.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-983

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1362


commit 19d180594a7cce258fc78632521a6536bde85935
Author: Sela <ans...@paypal.com>
Date:   2016-11-15T18:53:38Z

[BEAM-983] add missing license.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [BEAM-891] fix build occasionally fails on IndexOutOfBoundsException.

2016-11-15 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 47646d641 -> 503f26f44


[BEAM-891] fix build occasionally fails on IndexOutOfBoundsException.

Moved "TestPipelineOptions#withTmpCheckpointDir" to 
TestPipelineOptionsForStreaming.
Removed an unused member in ProvidedSparkContextTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0331dd1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0331dd1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0331dd1c

Branch: refs/heads/master
Commit: 0331dd1cd75e60484f0b15723e4e7edc280a4d12
Parents: 47646d6
Author: Stas Levin 
Authored: Thu Nov 10 13:32:51 2016 +0200
Committer: Sela 
Committed: Tue Nov 15 19:52:12 2016 +0200

--
 runners/spark/pom.xml   |  3 +-
 .../runners/spark/SparkPipelineOptions.java |  4 +-
 .../spark/translation/SparkRuntimeContext.java  |  4 +-
 .../runners/spark/ProvidedSparkContextTest.java | 26 -
 .../metrics/sink/NamedAggregatorsTest.java  | 13 +++--
 .../beam/runners/spark/io/AvroPipelineTest.java | 12 ++---
 .../beam/runners/spark/io/NumShardsTest.java| 10 ++--
 .../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++---
 .../spark/translation/SideEffectsTest.java  | 11 ++--
 .../streaming/EmptyStreamAssertionTest.java |  4 +-
 .../streaming/FlattenStreamingTest.java |  4 +-
 .../streaming/KafkaStreamingTest.java   |  4 +-
 .../ResumeFromCheckpointStreamingTest.java  |  4 +-
 .../streaming/SimpleStreamingWordCountTest.java |  6 +--
 .../utils/TestOptionsForStreaming.java  | 55 
 .../streaming/utils/TestPipelineOptions.java| 25 +
 .../utils/TestPipelineOptionsForStreaming.java  | 44 
 17 files changed, 132 insertions(+), 109 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 1e4a720..4c5b3f5 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -82,7 +82,8 @@
 
   [
 "--runner=TestSparkRunner",
-"--streaming=false"
+"--streaming=false",
+"--enableSparkMetricSinks=false"
   ]
 
 
true

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 5168c6c..b1ebde9 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -88,8 +88,8 @@ public interface SparkPipelineOptions
 
   @Description("Enable/disable sending aggregator values to Spark's metric 
sinks")
   @Default.Boolean(true)
-  Boolean getEnableSparkSinks();
-  void setEnableSparkSinks(Boolean enableSparkSinks);
+  Boolean getEnableSparkMetricSinks();
+  void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks);
 
   @Description("If the spark runner will be initialized with a provided Spark 
Context. "
   + "The Spark Context should be provided with SparkContextOptions.")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 181a111..564db39 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -86,11 +86,11 @@ public class SparkRuntimeContext implements Serializable {
 final Accumulator accum = 
AccumulatorSingleton.getInstance(jsc);
 final NamedAggregators initialValue = accum.value();
 
-if (opts.getEnableSparkSinks()) {
+if (opts.getEnableSparkMetricSinks()) {
   final MetricsSystem metricsSystem = 
SparkEnv$.MODULE$.get().metricsSystem();
   final AggregatorMetricSource aggregatorMetricSource =
 

[2/2] incubator-beam git commit: This closes #1332

2016-11-15 Thread amitsela
This closes #1332


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/503f26f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/503f26f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/503f26f4

Branch: refs/heads/master
Commit: 503f26f448ea9f46fcfcdd46e60cba80e4844e28
Parents: 47646d6 0331dd1
Author: Sela 
Authored: Tue Nov 15 19:53:52 2016 +0200
Committer: Sela 
Committed: Tue Nov 15 19:53:52 2016 +0200

--
 runners/spark/pom.xml   |  3 +-
 .../runners/spark/SparkPipelineOptions.java |  4 +-
 .../spark/translation/SparkRuntimeContext.java  |  4 +-
 .../runners/spark/ProvidedSparkContextTest.java | 26 -
 .../metrics/sink/NamedAggregatorsTest.java  | 13 +++--
 .../beam/runners/spark/io/AvroPipelineTest.java | 12 ++---
 .../beam/runners/spark/io/NumShardsTest.java| 10 ++--
 .../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++---
 .../spark/translation/SideEffectsTest.java  | 11 ++--
 .../streaming/EmptyStreamAssertionTest.java |  4 +-
 .../streaming/FlattenStreamingTest.java |  4 +-
 .../streaming/KafkaStreamingTest.java   |  4 +-
 .../ResumeFromCheckpointStreamingTest.java  |  4 +-
 .../streaming/SimpleStreamingWordCountTest.java |  6 +--
 .../utils/TestOptionsForStreaming.java  | 55 
 .../streaming/utils/TestPipelineOptions.java| 25 +
 .../utils/TestPipelineOptionsForStreaming.java  | 44 
 17 files changed, 132 insertions(+), 109 deletions(-)
--




[2/2] incubator-beam git commit: This closes #1291

2016-11-15 Thread amitsela
This closes #1291


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2bc66f90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2bc66f90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2bc66f90

Branch: refs/heads/master
Commit: 2bc66f903cdfa328c4bb3546befbaa0f58bdd6fa
Parents: 9c300cd 1bef01f
Author: Sela 
Authored: Tue Nov 15 13:37:20 2016 +0200
Committer: Sela 
Committed: Tue Nov 15 13:37:20 2016 +0200

--
 .../apache/beam/runners/spark/SparkRunner.java  |   4 +-
 .../spark/translation/BoundedDataset.java   | 114 
 .../beam/runners/spark/translation/Dataset.java |  34 +++
 .../spark/translation/EvaluationContext.java| 230 +++-
 .../spark/translation/TransformTranslator.java  |  99 +++
 .../SparkRunnerStreamingContextFactory.java |   7 +-
 .../streaming/StreamingEvaluationContext.java   | 272 ---
 .../streaming/StreamingTransformTranslator.java | 135 +
 .../translation/streaming/UnboundedDataset.java | 103 +++
 9 files changed, 464 insertions(+), 534 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext

2016-11-15 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 9c300cde8 -> 2bc66f903


[BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext

PR 1291 review changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1bef01fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1bef01fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1bef01fe

Branch: refs/heads/master
Commit: 1bef01fef5ff5ff9a960c85b00c2cc4aa504ce4d
Parents: 9c300cd
Author: Aviem Zur 
Authored: Sun Nov 13 13:57:07 2016 +0200
Committer: Sela 
Committed: Tue Nov 15 13:35:49 2016 +0200

--
 .../apache/beam/runners/spark/SparkRunner.java  |   4 +-
 .../spark/translation/BoundedDataset.java   | 114 
 .../beam/runners/spark/translation/Dataset.java |  34 +++
 .../spark/translation/EvaluationContext.java| 230 +++-
 .../spark/translation/TransformTranslator.java  |  99 +++
 .../SparkRunnerStreamingContextFactory.java |   7 +-
 .../streaming/StreamingEvaluationContext.java   | 272 ---
 .../streaming/StreamingTransformTranslator.java | 135 +
 .../translation/streaming/UnboundedDataset.java | 103 +++
 9 files changed, 464 insertions(+), 534 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 45c7f55..6bbef39 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,7 +26,6 @@ import 
org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
-import 
org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,6 +48,7 @@ import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * The SparkRunner translate operations defined on a pipeline to a 
representation
  * executable by Spark, and then submitting the job to Spark to be executed. 
If we wanted to run
@@ -136,7 +136,7 @@ public final class SparkRunner extends 
PipelineRunner {
 jssc.start();
 
 // if recovering from checkpoint, we have to reconstruct the 
EvaluationResult instance.
-return contextFactory.getCtxt() == null ? new 
StreamingEvaluationContext(jssc.sc(),
+return contextFactory.getCtxt() == null ? new 
EvaluationContext(jssc.sc(),
 pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
   } else {
 if (mOptions.getTimeout() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
new file mode 100644
index 000..774efb9
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+

[1/2] incubator-beam git commit: [BEAM-944] Spark runner causes an exception when creating pipeline options. Create a SparkContextOptions for context-ware options.

2016-11-10 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master cd3f61cf8 -> e43a38355


[BEAM-944] Spark runner causes an exception when creating pipeline options.
Create a SparkContextOptions for context-ware options.

Move UsesProvidedSparkContext property to SparkPipelineOptions so it's 
available from command-line
as well.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/121bff46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/121bff46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/121bff46

Branch: refs/heads/master
Commit: 121bff46d950e319eebf10e3a42bdd890edfb0c5
Parents: cd3f61c
Author: Sela 
Authored: Tue Nov 8 23:05:13 2016 +0200
Committer: Sela 
Committed: Thu Nov 10 23:27:17 2016 +0200

--
 .../beam/runners/spark/SparkContextOptions.java | 64 
 .../runners/spark/SparkPipelineOptions.java | 36 +++
 .../spark/translation/SparkContextFactory.java  | 19 +++---
 .../SparkRunnerStreamingContextFactory.java |  3 +-
 .../runners/spark/ProvidedSparkContextTest.java |  6 +-
 .../streaming/KafkaStreamingTest.java   |  4 +-
 6 files changed, 91 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
new file mode 100644
index 000..98f7492
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
+
+
+/**
+ * A custom {@link PipelineOptions} to work with properties related to {@link 
JavaSparkContext}.
+ *
+ * This can only be used programmatically (as opposed to passing command 
line arguments),
+ * since the properties here are context-aware and should not be propagated to 
workers.
+ *
+ * Separating this from {@link SparkPipelineOptions} is needed so the 
context-aware properties,
+ * which link to Spark dependencies, won't be scanned by {@link 
PipelineOptions}
+ * reflective instantiation.
+ * Note that {@link SparkContextOptions} is not registered with {@link 
SparkRunnerRegistrar}.
+ */
+public interface SparkContextOptions extends SparkPipelineOptions {
+
+  @Description("Provided Java Spark Context")
+  @JsonIgnore
+  JavaSparkContext getProvidedSparkContext();
+  void setProvidedSparkContext(JavaSparkContext jsc);
+
+  @Description("Spark streaming listeners")
+  @Default.InstanceFactory(EmptyListenersList.class)
+  @JsonIgnore
+  List getListeners();
+  void setListeners(List listeners);
+
+  /** Returns an empty list, top avoid handling null. */
+  class EmptyListenersList implements 
DefaultValueFactory {
+@Override
+public List create(PipelineOptions options) {
+  return new ArrayList<>();
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 

[2/2] incubator-beam git commit: This closes #1316

2016-11-10 Thread amitsela
This closes #1316


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e43a3835
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e43a3835
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e43a3835

Branch: refs/heads/master
Commit: e43a383559cb825a498c7427d58ce0a56b3f5245
Parents: cd3f61c 121bff4
Author: Sela 
Authored: Thu Nov 10 23:27:55 2016 +0200
Committer: Sela 
Committed: Thu Nov 10 23:27:55 2016 +0200

--
 .../beam/runners/spark/SparkContextOptions.java | 64 
 .../runners/spark/SparkPipelineOptions.java | 36 +++
 .../spark/translation/SparkContextFactory.java  | 19 +++---
 .../SparkRunnerStreamingContextFactory.java |  3 +-
 .../runners/spark/ProvidedSparkContextTest.java |  6 +-
 .../streaming/KafkaStreamingTest.java   |  4 +-
 6 files changed, 91 insertions(+), 41 deletions(-)
--




[GitHub] incubator-beam pull request #1316: [BEAM-944] Spark runner causes an excepti...

2016-11-08 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1316

[BEAM-944] Spark runner causes an exception when creating pipeline op…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…tions.

Create a SparkContextOptions for context-ware options.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-944

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1316.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1316


commit 45823481d1356caad138ca5c8504276045425411
Author: Sela <ans...@paypal.com>
Date:   2016-11-08T21:05:13Z

[BEAM-944] Spark runner causes an exception when creating pipeline options.
Create a SparkContextOptions for context-ware options.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1257: [BEAM-880] Avoid emitting default empty i...

2016-11-06 Thread amitsela
Github user amitsela closed the pull request at:

https://github.com/apache/incubator-beam/pull/1257


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a reliable fs.

2016-11-04 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 46fbfe06b -> 14e093a0a


[BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a 
reliable fs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a75d1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a75d1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a75d1f

Branch: refs/heads/master
Commit: 90a75d1fb0706ec4cc25a9eeeca8ade1b3b7de28
Parents: 46fbfe0
Author: Sela 
Authored: Thu Nov 3 18:22:20 2016 +0200
Committer: Sela 
Committed: Fri Nov 4 23:59:40 2016 +0200

--
 .../runners/spark/SparkPipelineOptions.java |  3 +--
 .../SparkRunnerStreamingContextFactory.java | 23 +---
 .../streaming/EmptyStreamAssertionTest.java |  3 +--
 .../streaming/FlattenStreamingTest.java |  6 ++---
 .../streaming/KafkaStreamingTest.java   |  6 ++---
 .../ResumeFromCheckpointStreamingTest.java  |  3 +--
 .../streaming/SimpleStreamingWordCountTest.java |  3 +--
 .../utils/TestOptionsForStreaming.java  | 12 +-
 8 files changed, 19 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 08e14fe..4eada35 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -77,8 +77,7 @@ public interface SparkPipelineOptions extends 
PipelineOptions, StreamingOptions,
   class TmpCheckpointDirFactory implements DefaultValueFactory {
 @Override
 public String create(PipelineOptions options) {
-  SparkPipelineOptions sparkPipelineOptions = 
options.as(SparkPipelineOptions.class);
-  return "file:///tmp/" + sparkPipelineOptions.getJobName();
+  return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName();
 }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 2378788..a670f61 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,11 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -48,7 +43,7 @@ import org.slf4j.LoggerFactory;
 public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextFactory {
   private static final Logger LOG =
   LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
-  private static final Iterable KNOWN_RELIABLE_FS = 
Arrays.asList("hdfs", "s3", "gs");
+  private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
 
   private final Pipeline pipeline;
   private final SparkPipelineOptions options;
@@ -83,19 +78,11 @@ public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextF
 
 // set checkpoint dir.
 String checkpointDir = options.getCheckpointDir();
-LOG.info("Checkpoint dir set to: {}", checkpointDir);
-try {
-  // validate checkpoint dir and warn if not of a known durable filesystem.
-  URL checkpointDirUrl = new URL(checkpointDir);
-  if (!Iterables.any(KNOWN_RELIABLE_FS, 
Predicates.equalTo(checkpointDirUrl.getProtocol( {
-LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, 
in case of failures "
-+ "this job may not recover properly or even at all.", 

[2/2] incubator-beam git commit: This closes #1272

2016-11-04 Thread amitsela
This closes #1272


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14e093a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14e093a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14e093a0

Branch: refs/heads/master
Commit: 14e093a0a574c8c3920a83c38e411a06b29bf44b
Parents: 46fbfe0 90a75d1
Author: Sela 
Authored: Sat Nov 5 00:02:22 2016 +0200
Committer: Sela 
Committed: Sat Nov 5 00:02:22 2016 +0200

--
 .../runners/spark/SparkPipelineOptions.java |  3 +--
 .../SparkRunnerStreamingContextFactory.java | 23 +---
 .../streaming/EmptyStreamAssertionTest.java |  3 +--
 .../streaming/FlattenStreamingTest.java |  6 ++---
 .../streaming/KafkaStreamingTest.java   |  6 ++---
 .../ResumeFromCheckpointStreamingTest.java  |  3 +--
 .../streaming/SimpleStreamingWordCountTest.java |  3 +--
 .../utils/TestOptionsForStreaming.java  | 12 +-
 8 files changed, 19 insertions(+), 40 deletions(-)
--




[GitHub] incubator-beam-site pull request #65: [BEAM-890] Update compatibility matrix...

2016-11-04 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam-site/pull/65

[BEAM-890] Update compatibility matrix for Spark.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam-site BEAM-890

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam-site/pull/65.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #65


commit 97c147fb3dbd4b860f441aa13d22c723e31021e4
Author: Sela <ans...@paypal.com>
Date:   2016-11-04T08:57:54Z

[BEAM-890] Update compatibility matrix for Spark.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1271: [BEAM-889] CheckpointDir option does not ...

2016-11-03 Thread amitsela
Github user amitsela closed the pull request at:

https://github.com/apache/incubator-beam/pull/1271


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1272: [BEAM-889] CheckpointDir option does not ...

2016-11-03 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1272

[BEAM-889] CheckpointDir option does not accept relative path and req…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…uires protocol.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-889

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1272.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1272


commit b4c9e19ffadae3ed9c64e22aeb11dd3dc7ac0872
Author: Sela <ans...@paypal.com>
Date:   2016-11-03T16:22:20Z

[BEAM-889] CheckpointDir option does not accept relative path and requires 
protocol.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1271: [BEAM-889] CheckpointDir option does not ...

2016-11-03 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1271

[BEAM-889] CheckpointDir option does not accept relative path and req…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…uires protocol.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1271


commit 2bf8163290bae3273ad068daf8f3776a8a8ad9cd
Author: Sela <ans...@paypal.com>
Date:   2016-11-03T14:56:08Z

[BEAM-889] CheckpointDir option does not accept relative path and requires 
protocol.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS."

2016-11-03 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master d75d8b2bb -> 6a05cf4a9


Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in 
ROS."

This reverts commit ff5409f15a9f741f437c489b7de763cfa3c68278.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f233d858
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f233d858
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f233d858

Branch: refs/heads/master
Commit: f233d858810cacace21b05e525131f05b26e6380
Parents: d75d8b2
Author: Sela 
Authored: Thu Nov 3 14:13:33 2016 +0200
Committer: Sela 
Committed: Thu Nov 3 14:13:33 2016 +0200

--
 runners/spark/pom.xml | 1 -
 1 file changed, 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f233d858/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e83aedf..71a3ac2 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -87,7 +87,6 @@
 
 
true
 false
-64
   
 
   



[2/2] incubator-beam git commit: This closes #1270

2016-11-03 Thread amitsela
This closes #1270


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a05cf4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a05cf4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a05cf4a

Branch: refs/heads/master
Commit: 6a05cf4a94728eb5e73a96e260eb5ef0bdc5fc9b
Parents: d75d8b2 f233d85
Author: Sela 
Authored: Thu Nov 3 15:35:10 2016 +0200
Committer: Sela 
Committed: Thu Nov 3 15:35:10 2016 +0200

--
 runners/spark/pom.xml | 1 -
 1 file changed, 1 deletion(-)
--




[GitHub] incubator-beam pull request #1270: Revert "[BEAM-808] Increase "spark.port.m...

2016-11-03 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1270

Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindExce…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…ption in ROS."

This reverts commit ff5409f15a9f741f437c489b7de763cfa3c68278.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-888

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1270.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1270


commit f233d858810cacace21b05e525131f05b26e6380
Author: Sela <ans...@paypal.com>
Date:   2016-11-03T12:13:33Z

Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException 
in ROS."

This reverts commit ff5409f15a9f741f437c489b7de763cfa3c68278.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS.

2016-11-02 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 2c0d0f476 -> 529f266ae


[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff5409f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff5409f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff5409f1

Branch: refs/heads/master
Commit: ff5409f15a9f741f437c489b7de763cfa3c68278
Parents: 2c0d0f4
Author: Sela 
Authored: Mon Oct 24 21:52:03 2016 +0300
Committer: Sela 
Committed: Wed Nov 2 14:53:16 2016 +0200

--
 runners/spark/pom.xml | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff5409f1/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 71a3ac2..e83aedf 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -87,6 +87,7 @@
 
 
true
 false
+64
   
 
   



[2/2] incubator-beam git commit: This closes #1170

2016-11-02 Thread amitsela
This closes #1170


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/529f266a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/529f266a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/529f266a

Branch: refs/heads/master
Commit: 529f266ae91167997a17a952ee98b1721bba47a8
Parents: 2c0d0f4 ff5409f
Author: Sela 
Authored: Wed Nov 2 14:54:21 2016 +0200
Committer: Sela 
Committed: Wed Nov 2 14:54:21 2016 +0200

--
 runners/spark/pom.xml | 1 +
 1 file changed, 1 insertion(+)
--




[GitHub] incubator-beam pull request #1257: [BEAM-880] Avoid emitting default empty i...

2016-11-02 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1257

[BEAM-880] Avoid emitting default empty iterable in PAssert.GroupedGl…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…obally.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam 
passert-remove-empty-dummy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1257


commit 99a79e3d19a6584a19c628e71a21400f97b4754a
Author: Sela <ans...@paypal.com>
Date:   2016-11-02T12:17:12Z

[BEAM-880] Avoid emitting default empty iterable in PAssert.GroupedGlobally.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Directly implement ReifyTimestampsAndWindows in SparkRunner

2016-10-28 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 215980ad3 -> 9c3e3e7a3


Directly implement ReifyTimestampsAndWindows in SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/597e3955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/597e3955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/597e3955

Branch: refs/heads/master
Commit: 597e3955c219a7c50df124a0689b99b98dfbbbc9
Parents: 215980a
Author: Kenneth Knowles 
Authored: Thu Oct 27 22:18:19 2016 -0700
Committer: Sela 
Committed: Fri Oct 28 10:56:44 2016 +0300

--
 .../translation/GroupCombineFunctions.java  |  5 +--
 .../ReifyTimestampsAndWindowsFunction.java  | 47 
 2 files changed, 48 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index e2a0f87..421b1b0 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -48,7 +47,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import scala.Tuple2;
 
 
@@ -77,8 +75,7 @@ public class GroupCombineFunctions {
 // Use coders to convert objects in the PCollection to byte arrays, so they
 // can be transferred over the network for the shuffle.
 JavaRDD, KV>(null,
-new ReifyTimestampAndWindowsDoFn(), runtimeContext, 
null, null))
+rdd.map(new ReifyTimestampsAndWindowsFunction())
 .map(WindowingHelpers.>unwindowFunction())
 .mapToPair(TranslationUtils.toPairFunction())
 .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
new file mode 100644
index 000..8281c17
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * Simple {@link Function} to bring the windowing information into the value 

[1/2] incubator-beam git commit: [BEAM-809] Create a KryoRegistrator for the SparkRunner.

2016-10-26 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 53fe3ee42 -> 78e2c0387


[BEAM-809] Create a KryoRegistrator for the SparkRunner.

Use Class#getName() instead of canonicalName().


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13b83858
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13b83858
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13b83858

Branch: refs/heads/master
Commit: 13b83858746356068a6d618e04da6839e837d28c
Parents: 53fe3ee
Author: Sela 
Authored: Mon Oct 24 22:35:39 2016 +0300
Committer: Sela 
Committed: Wed Oct 26 18:53:28 2016 +0300

--
 runners/spark/pom.xml   | 23 ++
 .../coders/BeamSparkRunnerRegistrator.java  | 46 
 .../spark/translation/SparkContextFactory.java  |  5 ++-
 3 files changed, 73 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ccec3c6..458205a 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -147,6 +147,29 @@
   provided
 
 
+  com.esotericsoftware.kryo
+  kryo
+  2.21
+  provided
+
+
+  de.javakaffee
+  kryo-serializers
+  0.39
+  
+
+
+  com.esotericsoftware
+  kryo
+
+
+
+  com.google.protobuf
+  protobuf-java
+
+  
+
+
   com.google.code.findbugs
   jsr305
   1.3.9

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
new file mode 100644
index 000..0e62781
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.esotericsoftware.kryo.Kryo;
+import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
+import de.javakaffee.kryoserializers.guava.ReverseListSerializer;
+import org.apache.spark.serializer.KryoRegistrator;
+
+
+/**
+ * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark 
runner needs.
+ */
+public class BeamSparkRunnerRegistrator implements KryoRegistrator {
+
+  @Override
+  public void registerClasses(Kryo kryo) {
+UnmodifiableCollectionsSerializer.registerSerializers(kryo);
+// Guava
+ImmutableListSerializer.registerSerializers(kryo);
+ImmutableSetSerializer.registerSerializers(kryo);
+ImmutableMapSerializer.registerSerializers(kryo);
+ImmutableMultimapSerializer.registerSerializers(kryo);
+ReverseListSerializer.registerSerializers(kryo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 4877f6e..ee2104a 100644
--- 

[2/2] incubator-beam git commit: This closes #1162

2016-10-26 Thread amitsela
This closes #1162


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53fe3ee4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53fe3ee4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53fe3ee4

Branch: refs/heads/master
Commit: 53fe3ee425163a76b69d0830449d222d925eb9cd
Parents: f2fe1ae a54ded3
Author: Sela 
Authored: Wed Oct 26 10:01:51 2016 +0300
Committer: Sela 
Committed: Wed Oct 26 10:01:51 2016 +0300

--
 .../apache/beam/runners/spark/SparkRunner.java  | 19 --
 .../translation/GroupCombineFunctions.java  | 66 +---
 .../spark/translation/TransformTranslator.java  | 43 +++--
 .../streaming/StreamingTransformTranslator.java | 65 +--
 .../spark/util/SparkSideInputReader.java|  2 +-
 5 files changed, 55 insertions(+), 140 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-799] Support GroupByKey directly.

2016-10-26 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master f2fe1ae46 -> 53fe3ee42


[BEAM-799] Support GroupByKey directly.

Remove runner override for GroupByKey.

Avoid NPE if no sideInputs are available in reader.

Handle CombineFn with or without context.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a54ded37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a54ded37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a54ded37

Branch: refs/heads/master
Commit: a54ded373fa7f6508fb46eea1a1d6f9bc405114b
Parents: f2fe1ae
Author: Sela 
Authored: Sat Oct 22 14:51:50 2016 +0300
Committer: Sela 
Committed: Wed Oct 26 10:00:45 2016 +0300

--
 .../apache/beam/runners/spark/SparkRunner.java  | 19 --
 .../translation/GroupCombineFunctions.java  | 66 +---
 .../spark/translation/TransformTranslator.java  | 43 +++--
 .../streaming/StreamingTransformTranslator.java | 65 +--
 .../spark/util/SparkSideInputReader.java|  2 +-
 5 files changed, 55 insertions(+), 140 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index b17c38c..45c7f55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark;
 
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PBegin;
@@ -115,23 +113,6 @@ public final class SparkRunner extends 
PipelineRunner {
   }
 
   /**
-   * Overrides for this runner.
-   */
-  @SuppressWarnings("rawtypes")
-  @Override
-  public  OutputT apply(
-  PTransform transform, InputT input) {
-
-if (transform instanceof GroupByKey) {
-  return (OutputT) ((PCollection) input).apply(
-  new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
-} else {
-  return super.apply(transform, input);
-}
-  }
-
-
-  /**
* No parameter constructor defaults to running this pipeline in Spark's 
local mode, in a single
* thread.
*/

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index de02b26..e2a0f87 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -20,11 +20,9 @@ package org.apache.beam.runners.spark.translation;
 
 
 import com.google.common.collect.Lists;
-
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -38,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import 

[GitHub] incubator-beam pull request #1171: [BEAM-809] Create a KryoRegistrator for t...

2016-10-24 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1171

[BEAM-809] Create a KryoRegistrator for the SparkRunner.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-809

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1171


commit 4e00575a4a8e25ed3618f2f7672d3c24c662a149
Author: Sela <ans...@paypal.com>
Date:   2016-10-24T19:35:39Z

[BEAM-809] Create a KryoRegistrator for the SparkRunner.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1170: [BEAM-808] Increase "spark.port.maxRetrie...

2016-10-24 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1170

[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

… ROS.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-808

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1170


commit 8f930b2fb97a377c6c168a33ca415a3eec7d41f4
Author: Sela <ans...@paypal.com>
Date:   2016-10-24T18:52:03Z

[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1162: [BEAM-799] Support GroupByKey directly

2016-10-22 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1162

[BEAM-799] Support GroupByKey directly

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-799

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1162.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1162


commit 52ed2eb3fee7e3af166f015851845fc4ff9f57e9
Author: Sela <ans...@paypal.com>
Date:   2016-10-22T11:51:50Z

Remove runner override for GroupByKey.

[BEAM-799] Support GroupByKey directly.

commit 93eaf607c26585e5157eb17812025df41bbd5072
Author: Sela <ans...@paypal.com>
Date:   2016-10-22T12:44:32Z

Avoid NPE if no sideInputs are available in reader.

commit 62c7f0e30a0246cc835f46e75046c72be14a1ba1
Author: Sela <ans...@paypal.com>
Date:   2016-10-22T12:45:40Z

Handle CombineFn with or without context.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1161: [BEAM-769] Spark streaming tests fail on ...

2016-10-22 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1161

[BEAM-769] Spark streaming tests fail on "nothing processed" if runti…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…me env. is slow because timeout

is hit before processing is done.

Make graceful stop the default.

Keep "pumping-in" the last batch in a mocked stream to handle overflowing 
batches in case of a
graceful stop.

Change tests accordingly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-769

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1161


commit 43c9e57ad8c7af022ba2f46ce4b8d20731f0766e
Author: Sela <ans...@paypal.com>
Date:   2016-10-20T22:20:33Z

[BEAM-769] Spark streaming tests fail on "nothing processed" if runtime 
env. is slow because timeout
is hit before processing is done.

Make graceful stop the default.

Keep "pumping-in" the last batch in a mocked stream to handle overflowing 
batches in case of a
graceful stop.

Change tests accordingly.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1153

2016-10-22 Thread amitsela
This closes #1153


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9a41eb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9a41eb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9a41eb9

Branch: refs/heads/master
Commit: a9a41eb94bee1200c93a735bbe54e80a5d776e3e
Parents: 4c90582 a7cc820
Author: Sela 
Authored: Sat Oct 22 12:24:21 2016 +0300
Committer: Sela 
Committed: Sat Oct 22 12:24:21 2016 +0300

--
 .../apache/beam/runners/spark/SparkRunner.java  | 33 +++-
 1 file changed, 32 insertions(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: [BEAM-794] Differ combining in case of merging windows with sideInputs.

2016-10-22 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 4c9058236 -> a9a41eb94


[BEAM-794] Differ combining in case of merging windows with sideInputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7cc8206
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7cc8206
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7cc8206

Branch: refs/heads/master
Commit: a7cc8206cbbc6ac10e71a0563da2fea4c708277b
Parents: 4c90582
Author: Sela 
Authored: Fri Oct 21 16:00:57 2016 +0300
Committer: Sela 
Committed: Sat Oct 22 12:23:33 2016 +0300

--
 .../apache/beam/runners/spark/SparkRunner.java  | 33 +++-
 1 file changed, 32 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7cc8206/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index cad53be..b17c38c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark;
 
 import java.util.Collection;
+import java.util.List;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -34,11 +35,13 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -206,7 +209,7 @@ public final class SparkRunner extends 
PipelineRunner {
 @SuppressWarnings("unchecked")
 Class transformClass =
 (Class) node.getTransform().getClass();
-if (translator.hasTranslation(transformClass)) {
+if (translator.hasTranslation(transformClass) && !shouldDefer(node)) {
   LOG.info("Entering directly-translatable composite transform: '{}'", 
node.getFullName());
   LOG.debug("Composite transform class: '{}'", transformClass);
   doVisitTransform(node);
@@ -216,6 +219,34 @@ public final class SparkRunner extends 
PipelineRunner {
   return CompositeBehavior.ENTER_TRANSFORM;
 }
 
+private boolean shouldDefer(TransformTreeNode node) {
+  PInput input = node.getInput();
+  // if the input is not a PCollection, or it is but with non merging 
windows, don't defer.
+  if (!(input instanceof PCollection)
+  || ((PCollection) 
input).getWindowingStrategy().getWindowFn().isNonMerging()) {
+return false;
+  }
+  // so far we know that the input is a PCollection with merging windows.
+  // check for sideInput in case of a Combine transform.
+  PTransform transform = node.getTransform();
+  boolean hasSideInput = false;
+  if (transform instanceof Combine.PerKey) {
+List sideInputs = ((Combine.PerKey) 
transform).getSideInputs();
+hasSideInput = sideInputs != null && !sideInputs.isEmpty();
+  } else if (transform instanceof Combine.Globally) {
+List sideInputs = ((Combine.Globally) 
transform).getSideInputs();
+hasSideInput = sideInputs != null && !sideInputs.isEmpty();
+  }
+  // defer if sideInputs are defined.
+  if (hasSideInput) {
+LOG.info("Deferring combine transformation {} for job {}", transform,
+ctxt.getPipeline().getOptions().getJobName());
+return true;
+  }
+  // default.
+  return false;
+}
+
 @Override
 public void visitPrimitiveTransform(TransformTreeNode node) {
   doVisitTransform(node);



[GitHub] incubator-beam pull request #1153: [BEAM-794] Differ combining in case of me...

2016-10-21 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1153

[BEAM-794] Differ combining in case of merging windows with sideInputs.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-794

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1153


commit ee5bcecb3c6ce51a2531d2c6ff36a6a99038d32f
Author: Sela <ans...@paypal.com>
Date:   2016-10-21T13:00:57Z

[BEAM-794] Differ combining in case of merging windows with sideInputs.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1144: [BEAM-781] Remove Spark's batch unit test...

2016-10-20 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1144

[BEAM-781] Remove Spark's batch unit tests and rely on ROS tests inst…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…ead.

The runner does not support Spark before 1.6 anymore.

Remove tests that can be trusted to ROS.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-781

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1144.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1144


commit b3c9502636d48a1e7cf6f2078c5130d7921f
Author: Sela <ans...@paypal.com>
Date:   2016-10-20T13:34:06Z

[BEAM-781] Remove Spark's batch unit tests and rely on ROS tests instead.

The runner does not support Spark before 1.6 anymore.

Remove tests that can be trusted to ROS.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1143: [BEAM-658] Support Read.Unbounded primiti...

2016-10-20 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1143

[BEAM-658] Support Read.Unbounded primitive.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Changed mapSourceFunction to use scala's native Option.

Upgrade to kryo-serializers 0.39 that provides support for ReverseList 
(used by Top).

Better logging.

Allow a longer read time-frame for read in tests.

Assert initial parallelism is gerater than zero.

Add OnBatchCompleted listener that writes to Kafka.

Test latest.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-658

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1143.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1143


commit 71ffb6b6aca9c997326d3b15951d1f295aaf7505
Author: Sela <ans...@paypal.com>
Date:   2016-09-29T13:30:32Z

[BEAM-658] Support Read.Unbounded primitive.

Changed mapSourceFunction to use scala's native Option.

Upgrade to kryo-serializers 0.39 that provides support for ReverseList 
(used by Top).

Better logging.

Allow a longer read time-frame for read in tests.

Assert initial parallelism is gerater than zero.

Add OnBatchCompleted listener that writes to Kafka.

Test latest.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] incubator-beam git commit: [BEAM-259] Execute selected RunnableOnService tests with Spark runner.

2016-10-19 Thread amitsela
[BEAM-259] Execute selected RunnableOnService tests with Spark runner.

Handle empty Flatten for bounded.

Spark will bubble out a SparkException for user code failure, so this won't 
catch. Asserting on the
error message should be good enough.

outputWithTimestamp should handle start/finishBundle as well.

Explode WindowedValues before processing.

sideOutputWithTimestamp to address start/finishBundle.

SideInput with windows.

Unused for now, remove.

Take sideInput window startegy into account, for combine as well.

reduce code duplication.

Spark combine support.

reuse code where possible.

Expose sideInputs and insertDefault in Combine.Globally for direct translation.

Direct translation of Combine.Globally into Spark's aggregate function.

Make default run with 4 cores by default - makes tests run with multiple 
threads, but not too many.

SideInputReader for the Spark runner.

A common abstraction for Keyed and Global implementation.

Implement Combine.Globally via Spark's aggregate.

runnable-on-service profile doesn't need pluginManagement.

Removing test as it does not follow a deterministic combine implementation.

Context reuse is mostly for testing. To avoid a test failure that will stop the 
context and fail all
following tests we need to recreate the context if it's stopped as well.

WindowFn is used, no need to pass the entire WindowStrategy.

Explode elements for processing only when necessary.

The SparkRunner should use Beam's UserCodeException instead of it's own custom
SparkProcessException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7eecd7ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7eecd7ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7eecd7ee

Branch: refs/heads/master
Commit: 7eecd7ee73cdd2b41e785b4540852530deead429
Parents: b0cb2e8
Author: Sela 
Authored: Fri Sep 23 13:32:28 2016 +0300
Committer: Sela 
Committed: Thu Oct 20 00:51:48 2016 +0300

--
 runners/spark/pom.xml   |  73 ++---
 .../runners/spark/SparkPipelineOptions.java |   2 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  15 +-
 .../beam/runners/spark/TestSparkRunner.java |   4 +-
 .../runners/spark/translation/DoFnFunction.java |  74 ++---
 .../translation/GroupCombineFunctions.java  | 235 +---
 .../spark/translation/MultiDoFnFunction.java|  85 +++---
 .../translation/SparkAbstractCombineFn.java | 134 +
 .../spark/translation/SparkContextFactory.java  |   3 +-
 .../spark/translation/SparkGlobalCombineFn.java | 260 ++
 .../spark/translation/SparkKeyedCombineFn.java  | 273 +++
 .../spark/translation/SparkProcessContext.java  | 160 +--
 .../spark/translation/TransformTranslator.java  | 143 +-
 .../spark/translation/TranslationUtils.java |  28 +-
 .../streaming/StreamingTransformTranslator.java | 126 +
 .../runners/spark/util/BroadcastHelper.java |  26 --
 .../spark/util/SparkSideInputReader.java|  95 +++
 .../spark/translation/CombineGloballyTest.java  | 101 ---
 .../translation/SparkPipelineOptionsTest.java   |   2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  14 +
 20 files changed, 1318 insertions(+), 535 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 60b2de9..a246c19 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -59,40 +59,40 @@
   local-runnable-on-service-tests
   false
   
-
-  
-
-  org.apache.maven.plugins
-  maven-surefire-plugin
-  
-
-  runnable-on-service-tests
-  
-
org.apache.beam.sdk.testing.RunnableOnService
-none
-true
-
-  org.apache.beam:java-sdk-all
-
-
-  org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest
-
-
-  
-[
-  "--runner=TestSparkRunner",
-  "--streaming=false"
-]
-  
-  
true
-  false
-
-  
-
-  
-
-  
-
+
+  
+org.apache.maven.plugins
+maven-surefire-plugin
+
+  

[1/3] incubator-beam git commit: [BEAM-259] Execute selected RunnableOnService tests with Spark runner.

2016-10-19 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master b0cb2e87b -> c472e1227


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index fbaf5b8..2135170 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -18,26 +18,37 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
@@ -45,33 +56,62 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
 /**
  * Spark runner process context.
  */
 public abstract class SparkProcessContext
 extends OldDoFn.ProcessContext {
-
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkProcessContext.class);
 
   private final OldDoFn fn;
   private final SparkRuntimeContext mRuntimeContext;
-  private final Map mSideInputs;
+  private final SideInputReader sideInputReader;
+  private final WindowFn windowFn;
 
-  protected WindowedValue windowedValue;
+  WindowedValue windowedValue;
 
   SparkProcessContext(OldDoFn fn,
-  SparkRuntimeContext runtime,
-  Map sideInputs) {
+  SparkRuntimeContext runtime,
+  Map> sideInputs,
+  WindowFn windowFn) {
 fn.super();
 this.fn = fn;
 this.mRuntimeContext = runtime;
-this.mSideInputs = sideInputs;
+this.sideInputReader = new SparkSideInputReader(sideInputs);
+this.windowFn = windowFn;
   }
 
   void setup() {
 setupDelegateAggregators();
   }
 
+  Iterable callWithCtxt(Iterator iter) throws 
Exception{
+this.setup();
+// skip if bundle is empty.
+if (!iter.hasNext()) {
+  return Lists.newArrayList();
+}
+try {
+  fn.setup();
+  fn.startBundle(this);
+  return this.getOutputIterable(iter, fn);
+} catch (Exception e) {
+  try {
+// this teardown handles exceptions encountered in setup() and 
startBundle(). teardown
+// after execution or due to exceptions in process element is called 
in the iterator
+// produced by ctxt.getOutputIterable returned from this method.
+fn.teardown();
+  } catch (Exception teardownException) {
+LOG.error(
+"Suppressing exception while tearing down Function {}", fn, 
teardownException);
+e.addSuppressed(teardownException);
+  }
+  throw wrapUserCodeException(e);
+}
+  }
+
   @Override
   public PipelineOptions getPipelineOptions() {
 return mRuntimeContext.getPipelineOptions();
@@ -79,15 +119,17 @@ public abstract class SparkProcessContext
 
   @Override
   public  T sideInput(PCollectionView view) {
-

[3/3] incubator-beam git commit: This closes #1055

2016-10-19 Thread amitsela
This closes #1055


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c472e122
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c472e122
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c472e122

Branch: refs/heads/master
Commit: c472e1227d4cc265152afd9afc072ddecc934dc4
Parents: b0cb2e8 7eecd7e
Author: Sela 
Authored: Thu Oct 20 01:24:34 2016 +0300
Committer: Sela 
Committed: Thu Oct 20 01:24:34 2016 +0300

--
 runners/spark/pom.xml   |  73 ++---
 .../runners/spark/SparkPipelineOptions.java |   2 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  15 +-
 .../beam/runners/spark/TestSparkRunner.java |   4 +-
 .../runners/spark/translation/DoFnFunction.java |  74 ++---
 .../translation/GroupCombineFunctions.java  | 235 +---
 .../spark/translation/MultiDoFnFunction.java|  85 +++---
 .../translation/SparkAbstractCombineFn.java | 134 +
 .../spark/translation/SparkContextFactory.java  |   3 +-
 .../spark/translation/SparkGlobalCombineFn.java | 260 ++
 .../spark/translation/SparkKeyedCombineFn.java  | 273 +++
 .../spark/translation/SparkProcessContext.java  | 160 +--
 .../spark/translation/TransformTranslator.java  | 143 +-
 .../spark/translation/TranslationUtils.java |  28 +-
 .../streaming/StreamingTransformTranslator.java | 126 +
 .../runners/spark/util/BroadcastHelper.java |  26 --
 .../spark/util/SparkSideInputReader.java|  95 +++
 .../spark/translation/CombineGloballyTest.java  | 101 ---
 .../translation/SparkPipelineOptionsTest.java   |   2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  14 +
 20 files changed, 1318 insertions(+), 535 deletions(-)
--




[2/2] incubator-beam git commit: This closes #1125

2016-10-19 Thread amitsela
This closes #1125


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0cb2e87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0cb2e87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0cb2e87

Branch: refs/heads/master
Commit: b0cb2e87b14182c9950974204a345a17181ff55c
Parents: ea04e61 84c6649
Author: Sela 
Authored: Wed Oct 19 21:35:20 2016 +0300
Committer: Sela 
Committed: Wed Oct 19 21:35:20 2016 +0300

--
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java| 14 +-
 1 file changed, 5 insertions(+), 9 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-744] UnboundedKafkaReader should return as soon as it can.

2016-10-19 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master ea04e618e -> b0cb2e87b


[BEAM-744] UnboundedKafkaReader should return as soon as it can.

Use timeout directly in nextBatch()


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84c6649c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84c6649c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84c6649c

Branch: refs/heads/master
Commit: 84c6649cd63c33ca79ad43e8973dbf765e27a5d0
Parents: ea04e61
Author: Sela 
Authored: Tue Oct 18 22:03:25 2016 +0300
Committer: Sela 
Committed: Wed Oct 19 21:34:07 2016 +0300

--
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java| 14 +-
 1 file changed, 5 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84c6649c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2030789..834104e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -756,9 +756,6 @@ public class KafkaIO {
 private Iterator curBatch = Collections.emptyIterator();
 
 private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
-// how long to wait for new records from kafka consumer inside start()
-private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = 
Duration.standardSeconds(5);
-// how long to wait for new records from kafka consumer inside advance()
 private static final Duration NEW_RECORDS_POLL_TIMEOUT = 
Duration.millis(10);
 
 // Use a separate thread to read Kafka messages. Kafka Consumer does all 
its work including
@@ -888,12 +885,13 @@ public class KafkaIO {
   LOG.info("{}: Returning from consumer pool loop", this);
 }
 
-private void nextBatch(Duration timeout) {
+private void nextBatch() {
   curBatch = Collections.emptyIterator();
 
   ConsumerRecords records;
   try {
-records = availableRecordsQueue.poll(timeout.getMillis(),
+// poll available records, wait (if necessary) up to the specified 
timeout.
+records = 
availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
  TimeUnit.MILLISECONDS);
   } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
@@ -966,9 +964,7 @@ public class KafkaIO {
 }
   }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
 
-  // Wait for longer than normal when fetching a batch to improve chances 
a record is available
-  // when start() returns.
-  nextBatch(START_NEW_RECORDS_POLL_TIMEOUT);
+  nextBatch();
   return advance();
 }
 
@@ -1032,7 +1028,7 @@ public class KafkaIO {
   return true;
 
 } else { // -- (b)
-  nextBatch(NEW_RECORDS_POLL_TIMEOUT);
+  nextBatch();
 
   if (!curBatch.hasNext()) {
 return false;



[2/2] incubator-beam git commit: This closes #1133

2016-10-19 Thread amitsela
This closes #1133


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ea04e618
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ea04e618
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ea04e618

Branch: refs/heads/master
Commit: ea04e618eae8e20e21a1db5b8367cf972446d9b6
Parents: dde8e35 2c8ade8
Author: Sela 
Authored: Wed Oct 19 10:08:05 2016 +0300
Committer: Sela 
Committed: Wed Oct 19 10:08:05 2016 +0300

--
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 26 
 1 file changed, 10 insertions(+), 16 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-777] KafkaIO Test should handle reader.start() better.

2016-10-19 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master dde8e35ca -> ea04e618e


[BEAM-777] KafkaIO Test should handle reader.start() better.

KafkaIOTest : start() can return false


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8ade83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8ade83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8ade83

Branch: refs/heads/master
Commit: 2c8ade83b2104ecd7f8098b18dd45a0fd8b6cc9f
Parents: dde8e35
Author: Raghu Angadi 
Authored: Tue Oct 18 21:46:18 2016 -0700
Committer: Sela 
Committed: Wed Oct 19 10:06:45 2016 +0300

--
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 26 
 1 file changed, 10 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8ade83/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
--
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 67aa675..2f3c524 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -389,7 +389,10 @@ public class KafkaIOTest {
   // Kafka records are read in a separate thread inside the reader. As a 
result advance() might not
   // read any records even from the mock consumer, especially for the first 
record.
   // This is a helper method to loop until we read a record.
-  private static void advanceOnce(UnboundedReader reader) throws 
IOException {
+  private static void advanceOnce(UnboundedReader reader, boolean 
isStarted) throws IOException {
+if (!isStarted && reader.start()) {
+  return;
+}
 while (!reader.advance()) {
   // very rarely will there be more than one attempts.
   // In case of a bug we might end up looping forever, and test will fail 
with a timeout.
@@ -418,9 +421,8 @@ public class KafkaIOTest {
 final int numToSkip = 20; // one from each partition.
 
 // advance numToSkip elements
-reader.start();
-for (int l = 1; l < numToSkip; ++l) {
-  advanceOnce(reader);
+for (int i = 0; i < numToSkip; ++i) {
+  advanceOnce(reader, i > 0);
 }
 
 // Confirm that we get the expected element in sequence before 
checkpointing.
@@ -435,13 +437,10 @@ public class KafkaIOTest {
 // Confirm that we get the next elements in sequence.
 // This also confirms that Reader interleaves records from each partitions 
by the reader.
 
-reader.start();
 for (int i = numToSkip; i < numElements; i++) {
+  advanceOnce(reader, i > numToSkip);
   assertEquals(i, (long) reader.getCurrent().getKV().getValue());
   assertEquals(i, reader.getCurrentTimestamp().getMillis());
-  if ((i + 1) < numElements) {
-advanceOnce(reader);
-  }
 }
   }
 
@@ -460,9 +459,8 @@ public class KafkaIOTest {
 
 UnboundedReader> reader = 
source.createReader(null, null);
 
-reader.start();
-for (int l = 1; l < initialNumElements; ++l) {
-  advanceOnce(reader);
+for (int l = 0; l < initialNumElements; ++l) {
+  advanceOnce(reader, l > 0);
 }
 
 // Checkpoint and restart, and confirm that the source continues correctly.
@@ -490,19 +488,15 @@ public class KafkaIOTest {
 
 reader = source.createReader(null, mark);
 
-reader.start();
-
 // Verify in any order. As the partitions are unevenly read, the returned 
records are not in a
 // simple order. Note that testUnboundedSourceCheckpointMark() verifies 
round-robin oder.
 
 List expected = new ArrayList<>();
 List actual = new ArrayList<>();
 for (long i = initialNumElements; i < numElements; i++) {
+  advanceOnce(reader, i > initialNumElements);
   expected.add(i);
   actual.add(reader.getCurrent().getKV().getValue());
-  if ((i + 1) < numElements) {
-advanceOnce(reader);
-  }
 }
 assertThat(actual, 
IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray()));
   }



[GitHub] incubator-beam pull request #1125: [BEAM-744] A runner should be able to ove...

2016-10-18 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1125

[BEAM-744] A runner should be able to override KafkaIO max wait prope…

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

…rties.

Add KafkaOptions for the UnboundedKafkaReader.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-744

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1125


commit 627f50cc510783117b0642d4f699d4b4d9b342c7
Author: Sela <ans...@paypal.com>
Date:   2016-10-18T11:36:04Z

[BEAM-744] A runner should be able to override KafkaIO max wait properties.

Add KafkaOptions for the UnboundedKafkaReader.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1073

2016-10-14 Thread amitsela
This closes #1073


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49f94443
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49f94443
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49f94443

Branch: refs/heads/master
Commit: 49f94443004a48c0c1524f3c431b73b0f94d53a2
Parents: d790dfe 44225cf
Author: Sela 
Authored: Fri Oct 14 16:54:54 2016 +0300
Committer: Sela 
Committed: Fri Oct 14 16:54:54 2016 +0300

--
 .../beam/runners/spark/EvaluationResult.java|   4 +-
 .../spark/translation/EvaluationContext.java|   3 +-
 .../streaming/StreamingEvaluationContext.java   |   6 +-
 .../apache/beam/runners/spark/DeDupTest.java|   3 +-
 .../beam/runners/spark/EmptyInputTest.java  |   1 -
 .../beam/runners/spark/SimpleWordCountTest.java |   6 +-
 .../apache/beam/runners/spark/TfIdfTest.java|   3 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   4 +-
 .../beam/runners/spark/io/NumShardsTest.java|   4 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   4 +-
 .../spark/translation/CombineGloballyTest.java  |   1 -
 .../spark/translation/CombinePerKeyTest.java|   1 -
 .../spark/translation/DoFnOutputTest.java   |   4 +-
 .../translation/MultiOutputWordCountTest.java   |   2 -
 .../spark/translation/SerializationTest.java|   4 +-
 .../translation/WindowedWordCountTest.java  |  10 +-
 .../streaming/EmptyStreamAssertionTest.java |  76 
 .../streaming/FlattenStreamingTest.java |  11 +-
 .../streaming/KafkaStreamingTest.java   |   6 +-
 .../RecoverFromCheckpointStreamingTest.java | 179 --
 .../ResumeFromCheckpointStreamingTest.java  | 182 +++
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 .../streaming/utils/PAssertStreaming.java   |  87 ++---
 23 files changed, 346 insertions(+), 261 deletions(-)
--




[2/2] incubator-beam git commit: This closes #1072

2016-10-14 Thread amitsela
This closes #1072


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d790dfe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d790dfe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d790dfe1

Branch: refs/heads/master
Commit: d790dfe1ba7da5387944e47389cd7b35061e2782
Parents: a0f649e 4b49abc
Author: Sela 
Authored: Fri Oct 14 13:32:14 2016 +0300
Committer: Sela 
Committed: Fri Oct 14 13:32:14 2016 +0300

--
 .../beam/runners/spark/SparkPipelineOptions.java  | 18 ++
 .../SparkRunnerStreamingContextFactory.java   |  8 
 2 files changed, 26 insertions(+)
--




[GitHub] incubator-beam pull request #1073: [BEAM-735] PAssertStreaming should make s...

2016-10-09 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1073

[BEAM-735] PAssertStreaming should make sure the assertion happened.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-735

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1073.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1073


commit ca3aa78d9651d8d8fc981234c9f4707414dc9738
Author: Sela <ans...@paypal.com>
Date:   2016-10-09T18:38:14Z

PAssertStreaming shuold check an assertion happened.

commit 4adc3827443a6d8490f6b1799fce5c820c9484a5
Author: Sela <ans...@paypal.com>
Date:   2016-10-09T18:39:00Z

Test assert for skipped assertion..

commit 948fc9d0e894c06e555cf47fc3db48e6aad55008
Author: Sela <ans...@paypal.com>
Date:   2016-10-09T18:39:53Z

This name is more true to the natureof this test.

commit 346e85df4436a07a8dd30d793d7fa0ca4bf23806
Author: Sela <ans...@paypal.com>
Date:   2016-10-09T18:40:46Z

Fix according to new PAssertStreaming.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1072: [BEAM-734] Add StreamingListeners via Spa...

2016-10-09 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1072

[BEAM-734] Add StreamingListeners via SparkPipelineOptions.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-734

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1072.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1072


commit 139cdbf0007305901cabaddfb3b387b979719d58
Author: Sela <ans...@paypal.com>
Date:   2016-10-09T10:44:58Z

Add StreamingListeners via SparkPipelineOptions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1055: [BEAM-259] Enable RunnableOnService for b...

2016-10-05 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1055

[BEAM-259] Enable RunnableOnService for batch.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Handle empty Flatten for bounded.

Spark will bubble out a SparkException for user code failure, so this won't 
catch. Asserting on the
error message should be good enough.

outputWithTimestamp should handle start/finishBundle as well.

Explode WindowedValues before processing.

sideOutputWithTimestamp to address start/finishBundle.

SideInput with windows.

Unused for now, remove.

Take sideInput window startegy into account, for combine as well.

reduce code duplication.

Spark combine support.

reuse code where possible.

Expose sideInputs and insertDefault in Combine.Globally for direct 
translation.

Direct translation of Combine.Globally into Spark's aggregate function.

Make default run with 4 cores by default - makes tests run with multiple 
threads, but not too many.

SideInputReader for the Spark runner.

A common abstraction for Keyed and Global implementation.

Implement Combine.Globally via Spark's aggregate.

runnable-on-service profile doesn't need pluginManagement.

Removing test as it does not follow a deterministic combine implementation.

Context reuse is mostly for testing. To avoid a test failure that will stop 
the context and fail all
following tests we need to recreate the context if it's stopped as well.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-259

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1055.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1055


commit 53ed9951203da7f7e2cff0ef4d9ec64f37a0f29d
Author: Sela <ans...@paypal.com>
Date:   2016-09-23T10:32:28Z

Enable RunnableOnService for batch.

Handle empty Flatten for bounded.

Spark will bubble out a SparkException for user code failure, so this won't 
catch. Asserting on the
error message should be good enough.

outputWithTimestamp should handle start/finishBundle as well.

Explode WindowedValues before processing.

sideOutputWithTimestamp to address start/finishBundle.

SideInput with windows.

Unused for now, remove.

Take sideInput window startegy into account, for combine as well.

reduce code duplication.

Spark combine support.

reuse code where possible.

Expose sideInputs and insertDefault in Combine.Globally for direct 
translation.

Direct translation of Combine.Globally into Spark's aggregate function.

Make default run with 4 cores by default - makes tests run with multiple 
threads, but not too many.

SideInputReader for the Spark runner.

A common abstraction for Keyed and Global implementation.

Implement Combine.Globally via Spark's aggregate.

runnable-on-service profile doesn't need pluginManagement.

Removing test as it does not follow a deterministic combine implementation.

Context reuse is mostly for testing. To avoid a test failure that will stop 
the context and fail all
following tests we need to recreate the context if it's stopped as well.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1040: [BEAM-703] SingletonViewFn might exhaust ...

2016-10-04 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/1040

[BEAM-703] SingletonViewFn might exhaust defaultValue if it's serialized 
after being used.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-703

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1040


commit 0bd97efc0d764af17cdd8abdf43bff33bb21be2b
Author: Sela <ans...@paypal.com>
Date:   2016-10-04T15:12:38Z

Avoid losing the encoded defaultValue.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [BEAM-657] Support Read.Bounded primitive.

2016-09-22 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 4872bde8f -> a00d2f810


[BEAM-657] Support Read.Bounded primitive.

Support Read.Bounded primitive.

Avro requires this for snappy.

Create is supported by Read.Bounded now.

Read.Bounded support should solve gs issues now.

remove unused direct translations. Addressed by BEAM-668.

Assert deault parallelism, close reader on exception, and other improvements.

Adressed more comments.

Extra line.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5848

Branch: refs/heads/master
Commit: 58489bc8cbe4d702ff9ae07f932fb96141a1
Parents: 4872bde
Author: Sela 
Authored: Wed Sep 21 15:13:02 2016 +0300
Committer: Sela 
Committed: Fri Sep 23 01:31:16 2016 +0300

--
 examples/java/pom.xml   |   2 +-
 .../org/apache/beam/examples/WordCountIT.java   |   6 -
 runners/spark/pom.xml   |   6 +
 .../apache/beam/runners/spark/SparkRunner.java  |   5 -
 .../apache/beam/runners/spark/io/SourceRDD.java | 198 +++
 .../spark/translation/TransformTranslator.java  |  22 ++-
 6 files changed, 223 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/examples/java/pom.xml
--
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 6a39f64..9a48ec6 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -184,7 +184,7 @@
 
   [
   "--project=apache-beam-testing",
-  "--tempRoot=/tmp",
+  "--tempRoot=gs://temp-storage-for-end-to-end-tests",
   "--runner=org.apache.beam.runners.spark.TestSparkRunner"
   ]
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
--
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java 
b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index b0e0fe0..2f2ea46 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.examples;
 
-import com.google.common.io.Resources;
 import java.util.Date;
 import org.apache.beam.examples.WordCount.WordCountOptions;
 import org.apache.beam.sdk.options.Default;
@@ -63,11 +62,6 @@ public class WordCountIT {
 new FileChecksumMatcher(options.getOutputChecksum(), 
options.getOutput() + "*"));
 
 String e2eTestInputPath = "gs://apache-beam-samples/apache/LICENSE";
-// Spark runner currently doesn't support GCS I/O, change default input to:
-// .../src/test/resources/LICENSE
-if (options.getRunner().getName().contains("SparkRunner")) {
-  e2eTestInputPath = Resources.getResource("LICENSE").getPath();
-}
 options.setInputFile(e2eTestInputPath);
 
 WordCount.main(TestPipeline.convertToArgs(options));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 228a90b..60b2de9 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -180,6 +180,12 @@
   joda-time
 
 
+  org.apache.commons
+  commons-compress
+  1.9
+  provided
+
+
   commons-io
   commons-io
   2.4

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 63dfe0d..3888ec2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -28,7 +28,6 @@ import 
org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import 

[2/2] incubator-beam git commit: This closes #983

2016-09-22 Thread amitsela
This closes #983


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a00d2f81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a00d2f81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a00d2f81

Branch: refs/heads/master
Commit: a00d2f810992c450c14eab2bb5e3aa3ad3f80f74
Parents: 4872bde 584
Author: Sela 
Authored: Fri Sep 23 01:32:26 2016 +0300
Committer: Sela 
Committed: Fri Sep 23 01:32:26 2016 +0300

--
 examples/java/pom.xml   |   2 +-
 .../org/apache/beam/examples/WordCountIT.java   |   6 -
 runners/spark/pom.xml   |   6 +
 .../apache/beam/runners/spark/SparkRunner.java  |   5 -
 .../apache/beam/runners/spark/io/SourceRDD.java | 198 +++
 .../spark/translation/TransformTranslator.java  |  22 ++-
 6 files changed, 223 insertions(+), 16 deletions(-)
--




[2/2] incubator-beam git commit: This closes #982

2016-09-22 Thread amitsela
This closes #982


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6082ebcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6082ebcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6082ebcc

Branch: refs/heads/master
Commit: 6082ebccedec076140720aefdb8f35e263847082
Parents: 8432752 b1474a1
Author: Sela 
Authored: Thu Sep 22 18:19:43 2016 +0300
Committer: Sela 
Committed: Thu Sep 22 18:19:43 2016 +0300

--
 .../streaming/StreamingTransformTranslator.java | 28 +--
 .../streaming/SimpleStreamingWordCountTest.java | 49 +---
 2 files changed, 46 insertions(+), 31 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-613] Revised SimpleStreamingWordCountTest to better test fixed windows.

2016-09-22 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 843275210 -> 6082ebcce


[BEAM-613] Revised SimpleStreamingWordCountTest to better test fixed windows.

Revised the test to test multiple batches

Set the timeout to 1 ms since it essentially plays no role here.
Removed blank lines between imports.

Refactored the timeout related stuff to make it more natural from Beam model's 
perspective.

Fix windowing bug.

Expected result if for the entire window.

Renamed the test's name to better reflect the use case it's testing.

Fixed a typo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1474a18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1474a18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1474a18

Branch: refs/heads/master
Commit: b1474a18c4fe3b3aefdb6cd364fce9dfc227b6df
Parents: 8432752
Author: Stas Levin 
Authored: Mon Sep 5 18:22:59 2016 +0300
Committer: Sela 
Committed: Thu Sep 22 18:18:19 2016 +0300

--
 .../streaming/StreamingTransformTranslator.java | 28 +--
 .../streaming/SimpleStreamingWordCountTest.java | 49 +---
 2 files changed, 46 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 64ddc57..9cb377d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -191,27 +191,29 @@ public final class StreamingTransformTranslator {
 @SuppressWarnings("unchecked")
 JavaDStream dStream =
 (JavaDStream) sec.getStream(transform);
+// get the right window durations.
+Duration windowDuration;
+Duration slideDuration;
 if (windowFn instanceof FixedWindows) {
-  Duration windowDuration = Durations.milliseconds(((FixedWindows) 
windowFn).getSize()
-  .getMillis());
-  sec.setStream(transform, dStream.window(windowDuration));
+  windowDuration = Durations.milliseconds(((FixedWindows) 
windowFn).getSize().getMillis());
+  slideDuration = windowDuration;
 } else if (windowFn instanceof SlidingWindows) {
-  Duration windowDuration = Durations.milliseconds(((SlidingWindows) 
windowFn).getSize()
-  .getMillis());
-  Duration slideDuration = Durations.milliseconds(((SlidingWindows) 
windowFn).getPeriod()
-  .getMillis());
-  sec.setStream(transform, dStream.window(windowDuration, 
slideDuration));
+  SlidingWindows slidingWindows = (SlidingWindows) windowFn;
+  windowDuration = 
Durations.milliseconds(slidingWindows.getSize().getMillis());
+  slideDuration = 
Durations.milliseconds(slidingWindows.getPeriod().getMillis());
+} else {
+  throw new UnsupportedOperationException(String.format("WindowFn %s 
is not supported.",
+  windowFn.getClass().getCanonicalName()));
 }
+JavaDStream windowedDStream =
+dStream.window(windowDuration, slideDuration);
 //--- then we apply windowing to the elements
-@SuppressWarnings("unchecked")
-JavaDStream dStream2 =
-(JavaDStream) sec.getStream(transform);
 if (TranslationUtils.skipAssignWindows(transform, context)) {
-  sec.setStream(transform, dStream2);
+  sec.setStream(transform, windowedDStream);
 } else {
   final OldDoFn addWindowsDoFn = new 
AssignWindowsDoFn<>(windowFn);
   final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
-  JavaDStream outStream = dStream2.transform(
+  JavaDStream outStream = windowedDStream.transform(
   new Function, 
JavaRDD>() {
 @Override
 public JavaRDD call(JavaRDD 
rdd) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java

[GitHub] incubator-beam pull request #983: [BEAM-657] Support Read.Bounded primitive

2016-09-21 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/983

[BEAM-657] Support Read.Bounded primitive

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-657

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/983.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #983






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.

2016-09-21 Thread amitsela
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 8341c6d..1a0511f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -19,39 +19,32 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
 import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
 import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
 import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
 import 
org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
@@ -63,36 +56,30 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+
 import scala.Tuple2;
 
+
 /**
  * Supports translation between a Beam transform, and Spark's operations on 
RDDs.
  */
@@ -101,31 +88,6 @@ public final class TransformTranslator {
   private TransformTranslator() {
   }
 
-  /**
-   * Getter of the field.
-   */
-  public static class FieldGetter {
-private final Map fields;
-
-public FieldGetter(Class clazz) {
- 

[1/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.

2016-09-21 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 5c23f4954 -> 1ceb12aeb


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
new file mode 100644
index 000..beaae13
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that clears the {@link 
org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
+ * which represents the Beam {@link 
org.apache.beam.sdk.transforms.Aggregator}s.
+ */
+class ClearAggregatorsRule extends ExternalResource {
+  @Override
+  protected void before() throws Throwable {
+AccumulatorSingleton.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 8b7762f..238d7ba 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
@@ -53,6 +54,9 @@ public class SimpleWordCountTest {
   @Rule
   public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
 
+  @Rule
+  public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
+
   private static final String[] WORDS_ARRAY = {
   "hi there", "hi", "hi sue bob",
   "hi sue", "", "bob hi"};

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 0d15d12..f85baab 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -67,8 +67,7 @@ public class SideEffectsTest implements Serializable {
 
   // TODO: remove the version check (and the setup and teardown methods) 
when we no
   // longer support Spark 1.3 or 1.4
-  String version = 
SparkContextFactory.getSparkContext(options.getSparkMaster(),
-  options.getAppName()).version();
+  String version = SparkContextFactory.getSparkContext(options).version();
   if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
 assertTrue(e.getCause() instanceof UserException);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index a6fe755..8210b0d 100644
--- 

[4/4] incubator-beam git commit: This closes #909

2016-09-21 Thread amitsela
This closes #909


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ceb12ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ceb12ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ceb12ae

Branch: refs/heads/master
Commit: 1ceb12aebd0ffa63bd28d31cbe830230713705ec
Parents: 5c23f49 0feb649
Author: Sela 
Authored: Wed Sep 21 20:17:38 2016 +0300
Committer: Sela 
Committed: Wed Sep 21 20:17:38 2016 +0300

--
 .../runners/spark/SparkPipelineOptions.java |  28 +-
 .../apache/beam/runners/spark/SparkRunner.java  | 121 ++--
 .../spark/aggregators/AccumulatorSingleton.java |  53 ++
 .../runners/spark/translation/DoFnFunction.java |  35 +-
 .../spark/translation/EvaluationContext.java|  17 +-
 .../translation/GroupCombineFunctions.java  | 262 +
 .../spark/translation/MultiDoFnFunction.java|  44 +-
 .../spark/translation/SparkContextFactory.java  |  48 +-
 .../translation/SparkPipelineEvaluator.java |  57 --
 .../translation/SparkPipelineTranslator.java|   5 +-
 .../spark/translation/SparkProcessContext.java  |  10 +-
 .../spark/translation/SparkRuntimeContext.java  |  44 +-
 .../spark/translation/TransformTranslator.java  | 473 +++-
 .../spark/translation/TranslationUtils.java | 195 +++
 .../SparkRunnerStreamingContextFactory.java |  98 
 .../streaming/StreamingEvaluationContext.java   |  44 +-
 .../streaming/StreamingTransformTranslator.java | 549 ---
 .../runners/spark/util/BroadcastHelper.java |  26 +
 .../runners/spark/ClearAggregatorsRule.java |  33 ++
 .../beam/runners/spark/SimpleWordCountTest.java |   4 +
 .../spark/translation/SideEffectsTest.java  |   3 +-
 .../streaming/FlattenStreamingTest.java |  54 +-
 .../streaming/KafkaStreamingTest.java   |  26 +-
 .../RecoverFromCheckpointStreamingTest.java | 179 ++
 .../streaming/SimpleStreamingWordCountTest.java |  25 +-
 .../utils/TestOptionsForStreaming.java  |  55 ++
 .../org/apache/beam/sdk/transforms/Combine.java |   7 +
 27 files changed, 1682 insertions(+), 813 deletions(-)
--




[3/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.

2016-09-21 Thread amitsela
[BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery 
in streaming.

Refactor translation mechanism to support checkpointing of DStream.

Support basic functionality with GroupByKey and ParDo.

Added support for grouping operations.

Added checkpointDir option, using it before execution.

Support Accumulator recovery from checkpoint.

Streaming tests should use JUnit's TemporaryFolder Rule for checkpoint 
directory.

Support combine optimizations.

Support durable sideInput via Broadcast.

Branches in the pipeline are either Bounded or Unbounded and should be handles 
so.

Handle flatten/union of Bouned/Unbounded RDD/DStream.

JavaDoc

Rebased on master.

Reuse functionality between batch and streaming translators

Better implementation of streaming/batch pipeline-branch translation.

Move group/combine functions to their own wrapping class.

Fixed missing licenses.

Use VisibleForTesting annotation instead of comment.

Remove Broadcast failure recovery, to be handled separately.

Stop streaming gracefully, so any checkpointing will finish first.

typo + better documentation.

Validate checkpointDir durability.

Add checkpoint duration option.

A more compact streaming tests init with Rules.

A more accurate test, removed broadcast from test as it will be handeled 
separately.

Bounded/Unbounded translation to be handled by the SparkPipelineTranslator 
implementation. Evaluator
decides if translateBounded or translateUnbounded according to the visited 
node's boundeness.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0feb6499
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0feb6499
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0feb6499

Branch: refs/heads/master
Commit: 0feb64994a05de4fe6b1ba178a38d03743b89b7a
Parents: 5c23f49
Author: Sela 
Authored: Thu Aug 25 23:49:01 2016 +0300
Committer: Sela 
Committed: Wed Sep 21 20:15:27 2016 +0300

--
 .../runners/spark/SparkPipelineOptions.java |  28 +-
 .../apache/beam/runners/spark/SparkRunner.java  | 121 ++--
 .../spark/aggregators/AccumulatorSingleton.java |  53 ++
 .../runners/spark/translation/DoFnFunction.java |  35 +-
 .../spark/translation/EvaluationContext.java|  17 +-
 .../translation/GroupCombineFunctions.java  | 262 +
 .../spark/translation/MultiDoFnFunction.java|  44 +-
 .../spark/translation/SparkContextFactory.java  |  48 +-
 .../translation/SparkPipelineEvaluator.java |  57 --
 .../translation/SparkPipelineTranslator.java|   5 +-
 .../spark/translation/SparkProcessContext.java  |  10 +-
 .../spark/translation/SparkRuntimeContext.java  |  44 +-
 .../spark/translation/TransformTranslator.java  | 473 +++-
 .../spark/translation/TranslationUtils.java | 195 +++
 .../SparkRunnerStreamingContextFactory.java |  98 
 .../streaming/StreamingEvaluationContext.java   |  44 +-
 .../streaming/StreamingTransformTranslator.java | 549 ---
 .../runners/spark/util/BroadcastHelper.java |  26 +
 .../runners/spark/ClearAggregatorsRule.java |  33 ++
 .../beam/runners/spark/SimpleWordCountTest.java |   4 +
 .../spark/translation/SideEffectsTest.java  |   3 +-
 .../streaming/FlattenStreamingTest.java |  54 +-
 .../streaming/KafkaStreamingTest.java   |  26 +-
 .../RecoverFromCheckpointStreamingTest.java | 179 ++
 .../streaming/SimpleStreamingWordCountTest.java |  25 +-
 .../utils/TestOptionsForStreaming.java  |  55 ++
 .../org/apache/beam/sdk/transforms/Combine.java |   7 +
 27 files changed, 1682 insertions(+), 813 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index db6b75c..7afb68c 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -19,9 +19,9 @@
 package org.apache.beam.runners.spark;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
@@ -48,6 +48,32 @@ public interface SparkPipelineOptions extends 

[2/2] incubator-beam git commit: [BEAM-628] Fixed the Graphite metrics sink configuration for spark-submit on yarn.

2016-09-12 Thread amitsela
[BEAM-628] Fixed the Graphite metrics sink configuration for spark-submit on 
yarn.

This closes #945


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/643cf63d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/643cf63d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/643cf63d

Branch: refs/heads/master
Commit: 643cf63d500b8ebe3295bad6d39030ed3a73ff12
Parents: f81b9a0 8451b31
Author: Sela 
Authored: Mon Sep 12 13:22:57 2016 +0300
Committer: Sela 
Committed: Mon Sep 12 13:22:57 2016 +0300

--
 .../spark/src/test/resources/metrics.properties | 61 
 1 file changed, 50 insertions(+), 11 deletions(-)
--




[1/2] incubator-beam git commit: Fixed the Graphite metrics sink configuration so it actually works when submitting using spark-submit on yarn.

2016-09-12 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master f81b9a041 -> 643cf63d5


Fixed the Graphite metrics sink configuration so it actually works when 
submitting using spark-submit on yarn.

Made the metrics configuration info a bit clearer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8451b312
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8451b312
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8451b312

Branch: refs/heads/master
Commit: 8451b31200ff47d99f40217afc743ff85ee10351
Parents: f81b9a0
Author: Stas Levin 
Authored: Mon Sep 12 11:27:22 2016 +0300
Committer: Sela 
Committed: Mon Sep 12 13:18:22 2016 +0300

--
 .../spark/src/test/resources/metrics.properties | 61 
 1 file changed, 50 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8451b312/runners/spark/src/test/resources/metrics.properties
--
diff --git a/runners/spark/src/test/resources/metrics.properties 
b/runners/spark/src/test/resources/metrics.properties
index 4aa01d2..143532d 100644
--- a/runners/spark/src/test/resources/metrics.properties
+++ b/runners/spark/src/test/resources/metrics.properties
@@ -14,16 +14,55 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+
+# The "org.apache.beam.runners.spark.aggregators.metrics.sink.XSink"
+# (a.k.a Beam.XSink) is only configured for the driver, the executors are set 
with a Spark native
+# implementation "org.apache.spark.metrics.sink.XSink" (a.k.a Spark.XSink).
+# This is due to sink class loading behavior, which is different on the driver 
and executors nodes.
+# Since Beam aggregator metrics are reported via Spark accumulators and thus 
make their way to the
+# driver, we only need the "Beam.XSink" on the driver side. Executor nodes can 
keep
+# reporting Spark native metrics using the traditional Spark.XSink.
+#
+# The the current sink configuration pattern is therefore:
+#
+# driver.**.class   = Beam.XSink
+# executor.**.class = Spark.XSink
+
+
+# * A metrics sink for tests *
 
*.sink.memory.class=org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics
+# * End of InMemoryMetrics sink configuration section *
+
+
+# * A sample configuration for outputting metrics to Graphite 
*
+
+#driver.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink
+#driver.sink.graphite.host=YOUR_HOST
+#driver.sink.graphite.port=2003
+#driver.sink.graphite.prefix=spark
+#driver.sink.graphite.period=1
+#driver.sink.graphite.unit=SECONDS
+
+#executor.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
+#executor.sink.graphite.host=YOUR_HOST
+#executor.sink.graphite.port=2003
+#executor.sink.graphite.prefix=spark
+#executor.sink.graphite.period=1
+#executor.sink.graphite.unit=SECONDS
+
+# * End of Graphite sik configuration section *
+
+
+# * A sample configuration for outputting metrics to a CSV file. 
*
+
+#driver.sink.csv.class=org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink
+#driver.sink.csv.directory=/tmp/spark-metrics
+#driver.sink.csv.period=1
+#driver.sink.graphite.unit=SECONDS
+
+#executor.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
+#executor.sink.csv.directory=/tmp/spark-metrics
+#executor.sink.csv.period=1
+#executor.sink.graphite.unit=SECONDS
 
-#*.sink.csv.class=org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink
-#*.sink.csv.directory=/tmp/spark-metrics
-#*.sink.csv.period=1
-#*.sink.graphite.unit=SECONDS
-
-#*.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink
-#*.sink.graphite.host=YOUR_HOST
-#*.sink.graphite.port=2003
-#*.sink.graphite.prefix=spark
-#*.sink.graphite.period=1
-#*.sink.graphite.unit=SECONDS
+# * End of CSV sink configuration section *



[2/2] incubator-beam git commit: [BEAM-627] Set Spark master only if not set.

2016-09-12 Thread amitsela
[BEAM-627] Set Spark master only if not set.

This closes #944


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f81b9a04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f81b9a04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f81b9a04

Branch: refs/heads/master
Commit: f81b9a04106e7aed484e8016b51c96654e8eb221
Parents: 49208ca c961062
Author: Sela 
Authored: Mon Sep 12 11:41:53 2016 +0300
Committer: Sela 
Committed: Mon Sep 12 11:41:53 2016 +0300

--
 .../beam/runners/spark/translation/SparkContextFactory.java | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Set master if not already set.

2016-09-12 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 49208cadd -> f81b9a041


Set master if not already set.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9610623
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9610623
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9610623

Branch: refs/heads/master
Commit: c9610623e41f2352182bb8a359244df226902079
Parents: 49208ca
Author: Sela 
Authored: Mon Sep 12 11:00:34 2016 +0300
Committer: Sela 
Committed: Mon Sep 12 11:00:34 2016 +0300

--
 .../beam/runners/spark/translation/SparkContextFactory.java | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9610623/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index e008448..0e7db9f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -64,7 +64,10 @@ public final class SparkContextFactory {
 
   private static JavaSparkContext createSparkContext(String master, String 
appName) {
 SparkConf conf = new SparkConf();
-conf.setMaster(master);
+if (!conf.contains("spark.master")) {
+  // set master if not set.
+  conf.setMaster(master);
+}
 conf.setAppName(appName);
 conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
 return new JavaSparkContext(conf);



[GitHub] incubator-beam pull request #944: [BEAM-627] Set master if not already set.

2016-09-12 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/944

[BEAM-627] Set master if not already set.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-627

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #944


commit c9610623e41f2352182bb8a359244df226902079
Author: Sela <ans...@paypal.com>
Date:   2016-09-12T08:00:34Z

Set master if not already set.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Support Verifiers in TestSparkRunner

2016-09-10 Thread amitsela
Repository: incubator-beam
Updated Branches:
  refs/heads/master 82ebfd487 -> 49208cadd


Support Verifiers in TestSparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f4ef88b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f4ef88b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f4ef88b

Branch: refs/heads/master
Commit: 0f4ef88b58ea8a7851f592dc4bb42702fdde9c0a
Parents: 82ebfd4
Author: Aviem Zur 
Authored: Thu Aug 25 17:23:07 2016 +0300
Committer: Sela 
Committed: Sat Sep 10 11:25:28 2016 +0300

--
 runners/spark/pom.xml| 11 +--
 .../org/apache/beam/runners/spark/TestSparkRunner.java   |  9 -
 2 files changed, 17 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f4ef88b/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index b928b44..14bbd73 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -242,12 +242,19 @@
 
   junit
   junit
-  test
+  provided
+  
+
+  hamcrest-core
+  org.hamcrest
+
+  
 
+
 
   org.hamcrest
   hamcrest-all
-  test
+  provided
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f4ef88b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 376b80f..a1e5918 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -18,10 +18,13 @@
 
 package org.apache.beam.runners.spark;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -70,6 +73,10 @@ public final class TestSparkRunner extends 
PipelineRunner {
 
   @Override
   public EvaluationResult run(Pipeline pipeline) {
-return delegate.run(pipeline);
+TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
+EvaluationResult result = delegate.run(pipeline);
+assertThat(result, testPipelineOptions.getOnCreateMatcher());
+assertThat(result, testPipelineOptions.getOnSuccessMatcher());
+return result;
   }
 }



[GitHub] incubator-beam pull request #909: [BEAM-610] Enable spark's checkpointing me...

2016-08-31 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/incubator-beam/pull/909

[BEAM-610] Enable spark's checkpointing mechanism for driver-failure 
recovery in streaming

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Support basic functionality with GroupByKey and ParDo.

Added support for grouping operations.

Added checkpointDir option, using it before execution.

Support Accumulator recovery from checkpoint.

Streaming tests should use JUnit's TemporaryFolder Rule for checkpoint 
directory.

Support combine optimizations.

Support durable sideInput via Broadcast.

Branches in the pipeline are either Bounded or Unbounded and should be 
handles so.

Handle flatten/union of Bouned/Unbounded RDD/DStream.

JavaDoc

Rebased on master.

Reuse functionality between batch and streaming translators

Better implementation of streaming/batch pipeline-branch translation.

Move group/combine functions to their own wrapping class.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/incubator-beam BEAM-610

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #909


commit ec9cd0805c23afd792dcccf0f8fb268cdbb0e319
Author: Sela <ans...@paypal.com>
Date:   2016-08-25T20:49:01Z

Refactor translation mechanism to support checkpointing of DStream.

Support basic functionality with GroupByKey and ParDo.

Added support for grouping operations.

Added checkpointDir option, using it before execution.

Support Accumulator recovery from checkpoint.

Streaming tests should use JUnit's TemporaryFolder Rule for checkpoint 
directory.

Support combine optimizations.

Support durable sideInput via Broadcast.

Branches in the pipeline are either Bounded or Unbounded and should be 
handles so.

Handle flatten/union of Bouned/Unbounded RDD/DStream.

JavaDoc

Rebased on master.

Reuse functionality between batch and streaming translators

Better implementation of streaming/batch pipeline-branch translation.

Move group/combine functions to their own wrapping class.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: [BEAM-592] Fix SparkRunner Dependency Problem in WordCount This closes #892

2016-08-27 Thread amitsela
[BEAM-592] Fix SparkRunner Dependency Problem in WordCount
This closes #892


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/baf5e416
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/baf5e416
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/baf5e416

Branch: refs/heads/master
Commit: baf5e416dc67a650e04770f67496ff9f6fb7bc0b
Parents: a17a99f ef828de
Author: Sela 
Authored: Sat Aug 27 12:31:33 2016 +0300
Committer: Sela 
Committed: Sat Aug 27 12:31:33 2016 +0300

--
 examples/java/pom.xml  | 6 ++
 examples/java8/pom.xml | 6 ++
 2 files changed, 12 insertions(+)
--




  1   2   >