[GitHub] incubator-beam pull request #1654: [BEAM-1177] Input DStream "bundles" shoul...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1654 [BEAM-1177] Input DStream "bundles" should be in serialized form and include relevant metadata. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam read-unbounded-bytes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1654.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1654 commit 975dec257364d68b5ada3bced7f139e88853722a Author: Sela <ans...@paypal.com> Date: 2016-12-18T12:36:53Z SparkUnboundedSource mapWithStateDStream input data shuold be in serialized form for shuffle and checkpointing. Emit read count and watermark per microbatch. commit 53bd915b8ccacf18b71da16a0a434013ef41 Author: Sela <ans...@paypal.com> Date: 2016-12-18T13:16:23Z Report the input global watermark for batch to the UI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1500: [BEAM-1074] Set default-partitioner in So...
Github user amitsela closed the pull request at: https://github.com/apache/incubator-beam/pull/1500 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-362] Port runners to runners-core AggregatoryFactory
Repository: incubator-beam Updated Branches: refs/heads/master d624d3b6b -> 5ebbd500c [BEAM-362] Port runners to runners-core AggregatoryFactory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55f04955 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55f04955 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55f04955 Branch: refs/heads/master Commit: 55f0495583312c9c0dea620d6a4e85193e97f255 Parents: d624d3b Author: Kenneth KnowlesAuthored: Thu Dec 15 21:06:14 2016 -0800 Committer: Sela Committed: Fri Dec 16 11:46:18 2016 +0200 -- .../runners/apex/translation/operators/ApexParDoOperator.java | 2 +- .../src/main/java/org/apache/beam/runners/core/DoFnRunners.java | 1 - .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 1 - .../java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java | 1 - .../org/apache/beam/runners/direct/AggregatorContainer.java | 2 +- .../flink/translation/wrappers/streaming/DoFnOperator.java | 3 ++- .../apache/beam/runners/spark/aggregators/SparkAggregators.java | 5 +++-- 7 files changed, 7 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 1e76949..4538fb5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -46,7 +47,6 @@ import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index da16573..0e4bf75 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 041cdde..d504b40 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -30,7 +30,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import
[2/2] incubator-beam git commit: This closes #1644
This closes #1644 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ebbd500 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ebbd500 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ebbd500 Branch: refs/heads/master Commit: 5ebbd500c1548a6a53a5fbce8b0b3dd67d735d1e Parents: d624d3b 55f0495 Author: SelaAuthored: Fri Dec 16 11:47:06 2016 +0200 Committer: Sela Committed: Fri Dec 16 11:47:06 2016 +0200 -- .../runners/apex/translation/operators/ApexParDoOperator.java | 2 +- .../src/main/java/org/apache/beam/runners/core/DoFnRunners.java | 1 - .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 1 - .../java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java | 1 - .../org/apache/beam/runners/direct/AggregatorContainer.java | 2 +- .../flink/translation/wrappers/streaming/DoFnOperator.java | 3 ++- .../apache/beam/runners/spark/aggregators/SparkAggregators.java | 5 +++-- 7 files changed, 7 insertions(+), 8 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-932] Enable findbugs validation (and fix existing issues)
Repository: incubator-beam Updated Branches: refs/heads/master 1ad638e51 -> 4323247a3 [BEAM-932] Enable findbugs validation (and fix existing issues) Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bba3700a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bba3700a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bba3700a Branch: refs/heads/master Commit: bba3700aebd9dcaf88402b6845008cf7e5950cfe Parents: 1ad638e Author: Ismaël MejÃaAuthored: Thu Dec 15 14:34:20 2016 +0100 Committer: Ismaël MejÃa Committed: Thu Dec 15 14:34:20 2016 +0100 -- runners/spark/pom.xml | 10 .../metrics/WithNamedAggregatorsSupport.java| 4 +- .../spark/translation/WindowingHelpers.java | 3 +- .../src/main/resources/beam/findbugs-filter.xml | 49 +--- 4 files changed, 47 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index e8fffa2..5a2fe87 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -146,7 +146,6 @@ com.google.code.findbugs jsr305 - 1.3.9 com.google.guava @@ -317,15 +316,6 @@ - - - org.codehaus.mojo - findbugs-maven-plugin - -true - - - org.apache.maven.plugins maven-surefire-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java index 6932ae6..5e71280 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java @@ -121,8 +121,8 @@ public class WithNamedAggregatorsSupport extends MetricRegistry { final String parentName = entry.getKey(); final Map gaugeMap = Maps.transformEntries(agg.renderAll(), toGauge()); final Map fullNameGaugeMap = Maps.newLinkedHashMap(); -for (String shortName : gaugeMap.keySet()) { - fullNameGaugeMap.put(parentName + "." + shortName, gaugeMap.get(shortName)); +for (Map.Entry gaugeEntry : gaugeMap.entrySet()) { + fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), gaugeEntry.getValue()); } return Maps.filterValues(fullNameGaugeMap, Predicates.notNull()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java index ec94f3e..0acff71 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.translation; +import javax.annotation.Nonnull; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.api.java.function.Function; @@ -84,7 +85,7 @@ public final class WindowingHelpers { public static com.google.common.base.Function unwindowValueFunction() { return new com.google.common.base.Function () { @Override - public T apply(WindowedValue t) { + public T apply(@Nonnull WindowedValue t) { return t.getValue(); } }; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bba3700a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml -- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index a696aeb..bfb4988 100644 ---
[2/2] incubator-beam git commit: This closes #1463
This closes #1463 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4323247a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4323247a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4323247a Branch: refs/heads/master Commit: 4323247a3a8ea6fc06b99f66f0001f6956b494c9 Parents: 1ad638e bba3700 Author: SelaAuthored: Thu Dec 15 16:16:57 2016 +0200 Committer: Sela Committed: Thu Dec 15 16:16:57 2016 +0200 -- runners/spark/pom.xml | 10 .../metrics/WithNamedAggregatorsSupport.java| 4 +- .../spark/translation/WindowingHelpers.java | 3 +- .../src/main/resources/beam/findbugs-filter.xml | 49 +--- 4 files changed, 47 insertions(+), 19 deletions(-) --
[GitHub] incubator-beam pull request #1614: [BEAM-853] Force streaming execution on b...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1614 [BEAM-853] Force streaming execution on batch pipelines for testing. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-853 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1614 commit 7b7fed35cc3119ba11cca076e60e8cb1d13836a1 Author: Sela <ans...@paypal.com> Date: 2016-12-14T10:20:08Z Expose the adapted source. commit e729db1a84c5c554a61b7a0a2db89fb0cf45f191 Author: Sela <ans...@paypal.com> Date: 2016-12-14T10:20:42Z Force streaming execution, if set in PipelineOptions. commit e2f9419810a581e82b703d90b00db71e040f806a Author: Sela <ans...@paypal.com> Date: 2016-12-14T14:28:24Z Added test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1579: [BEAM-1130, BEAM-1133] Allow users to def...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1579 [BEAM-1130, BEAM-1133] Allow users to define max records per batch, update tests accordingly. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam readunbounded-minread-in-microbatch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1579 commit d5382819a3c9c3975ab5aac81cde01d9ba79da93 Author: Sela <ans...@paypal.com> Date: 2016-12-12T10:37:32Z [BEAM-1133] Add maxNumRecords per micro-batch for Spark runner options. commit 84fc4da6484baf023015b4745dc9be12ec4d35ab Author: Sela <ans...@paypal.com> Date: 2016-12-12T10:41:34Z [BEAM-1130] SparkRunner ResumeFromCheckpointStreamingTest Failing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1578: [BEAM-757, BEAM-807]
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1578 [BEAM-757, BEAM-807] Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam new-do-fn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1578.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1578 commit 5748fe9cfd9c36d92b2870acde5e071ea696ac78 Author: Sela <ans...@paypal.com> Date: 2016-12-11T12:30:24Z Use DoFnRunner in the implementation os FlatMapFunction. Migrate to new DoFn. commit 7f73c991426e0602d318a47cd9a38a3eebd979bf Author: Sela <ans...@paypal.com> Date: 2016-12-11T12:31:59Z Implement AggregatorFactory for Spark runner, to be used by DoFnRunner. commit b2b0d463b0cf2105248d02411a190f6406f26c69 Author: Sela <ans...@paypal.com> Date: 2016-12-11T12:32:49Z Migrate to new DoFn. commit 00219617355e1d77d0bba6c272c0a2b3595eac61 Author: Sela <ans...@paypal.com> Date: 2016-12-11T19:09:47Z Add a custom AssignWindows implementation. commit cd574d62825d636bb30ab0fd13172d2f8bb5cbb7 Author: Sela <ans...@paypal.com> Date: 2016-12-12T09:33:58Z Setup and teardown DoFn. commit 9355b1ef4943b29e7d26735484aad2e63bb1d1eb Author: Sela <ans...@paypal.com> Date: 2016-12-12T09:34:17Z Add implementation for GroupAlsoByWindow via flatMap. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-921] spark-runner: register sources and coders to serialize with java serializer
Repository: incubator-beam Updated Branches: refs/heads/master e841b1a21 -> bf8a3cb3a [BEAM-921] spark-runner: register sources and coders to serialize with java serializer Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aba40e2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aba40e2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aba40e2d Branch: refs/heads/master Commit: aba40e2de9ba058f33086eb6a913fa583a82b058 Parents: e841b1a Author: Aviem ZurAuthored: Thu Dec 8 15:07:06 2016 +0200 Committer: Sela Committed: Sun Dec 11 15:18:51 2016 +0200 -- runners/spark/pom.xml | 35 +--- .../coders/BeamSparkRunnerRegistrator.java | 60 +++- .../coders/BeamSparkRunnerRegistratorTest.java | 57 +++ 3 files changed, 118 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index d1ef225..86e9039 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -54,7 +54,7 @@ - local-runnable-on-service-tests false @@ -134,28 +134,14 @@ ${hadoop.version} provided + com.esotericsoftware.kryo kryo - 2.21 - provided - - - de.javakaffee - kryo-serializers - 0.39 - - - - com.esotericsoftware - kryo - - - - com.google.protobuf - protobuf-java - - + 2.21.1 com.google.code.findbugs @@ -264,6 +250,11 @@ metrics-core ${dropwizard.metrics.version} + + org.reflections + reflections + 0.9.10 + @@ -405,6 +396,10 @@ com.google.thirdparty org.apache.beam.spark.relocated.com.google.thirdparty + +com.esotericsoftware.kryo + org.apache.beam.spark.relocated.com.esotericsoftware.kryo + true spark-app http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java index 0e62781..41b0a01 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java @@ -19,28 +19,60 @@ package org.apache.beam.runners.spark.coders; import com.esotericsoftware.kryo.Kryo; -import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer; -import de.javakaffee.kryoserializers.guava.ReverseListSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.Source; import org.apache.spark.serializer.KryoRegistrator; +import org.reflections.Reflections; /** - * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs. + * Custom {@link KryoRegistrator}s for Beam's Spark runner needs. */ public class BeamSparkRunnerRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { -UnmodifiableCollectionsSerializer.registerSerializers(kryo); -// Guava -ImmutableListSerializer.registerSerializers(kryo); -ImmutableSetSerializer.registerSerializers(kryo); -ImmutableMapSerializer.registerSerializers(kryo); -ImmutableMultimapSerializer.registerSerializers(kryo); -ReverseListSerializer.registerSerializers(kryo); +for (Class clazz : ClassesForJavaSerialization.getClasses()) { + kryo.register(clazz, new JavaSerializer()); +
[2/2] incubator-beam git commit: This closes #1552
This closes #1552 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf8a3cb3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf8a3cb3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf8a3cb3 Branch: refs/heads/master Commit: bf8a3cb3a5948bd1ef7f7e5cef230ecd4e8f1c84 Parents: e841b1a aba40e2 Author: SelaAuthored: Sun Dec 11 15:19:24 2016 +0200 Committer: Sela Committed: Sun Dec 11 15:19:24 2016 +0200 -- runners/spark/pom.xml | 35 +--- .../coders/BeamSparkRunnerRegistrator.java | 60 +++- .../coders/BeamSparkRunnerRegistratorTest.java | 57 +++ 3 files changed, 118 insertions(+), 34 deletions(-) --
[GitHub] incubator-beam-site pull request #106: [BEAM-900] Spark quickstart instructi...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam-site/pull/106 [BEAM-900] Spark quickstart instructions. R: @davorbonaci You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam-site BEAM-900 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam-site/pull/106.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #106 commit c10d2c52084dd01f03e6df86e4d2ab47ccab4e36 Author: Sela <ans...@paypal.com> Date: 2016-12-10T20:43:24Z fixup! wrong generated directory name. commit 8dcc50e023f9070a60de72afc80db42bbff0b654 Author: Sela <ans...@paypal.com> Date: 2016-12-10T20:43:49Z [BEAM-900] Spark quickstart instructions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1561
This closes #1561 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9a63117 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9a63117 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9a63117 Branch: refs/heads/master Commit: d9a6311734064b1c7171b943eeb511c4d648187a Parents: 63d197c e48b0e6 Author: SelaAuthored: Fri Dec 9 18:01:12 2016 +0200 Committer: Sela Committed: Fri Dec 9 18:01:12 2016 +0200 -- .../game/utils/WriteWindowedToBigQuery.java| 5 + .../runners/dataflow/internal/AssignWindows.java | 8 .../dataflow/DataflowPipelineTranslatorTest.java | 17 + 3 files changed, 14 insertions(+), 16 deletions(-) --
[1/2] incubator-beam git commit: Remove misc uses of OldDoFn
Repository: incubator-beam Updated Branches: refs/heads/master 63d197cd0 -> d9a631173 Remove misc uses of OldDoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e48b0e6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e48b0e6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e48b0e6b Branch: refs/heads/master Commit: e48b0e6bc20d8eba2968decf7ac2b4ee7503a4df Parents: 63d197c Author: Kenneth KnowlesAuthored: Thu Dec 8 23:33:40 2016 -0800 Committer: Sela Committed: Fri Dec 9 18:00:39 2016 +0200 -- .../game/utils/WriteWindowedToBigQuery.java| 5 + .../runners/dataflow/internal/AssignWindows.java | 8 .../dataflow/DataflowPipelineTranslatorTest.java | 17 + 3 files changed, 14 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e48b0e6b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java -- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index c32289f..7a4fb2c 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; @@ -43,9 +42,7 @@ public class WriteWindowedToBigQuery } /** Convert each key/score pair into a BigQuery TableRow. */ - protected class BuildRowFn extends DoFn - implements RequiresWindowAccess { - + protected class BuildRowFn extends DoFn { @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e48b0e6b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java index 68ee7bc..27fe13d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.dataflow.internal; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Window; @@ -63,9 +63,9 @@ public class AssignWindows extends PTransform } else { // If the windowFn didn't change, we just run a pass-through transform and then set the // new windowing strategy. - return input.apply("Identity", ParDo.of(new OldDoFn () { -@Override -public void processElement(OldDoFn .ProcessContext c) throws Exception { + return input.apply("Identity", ParDo.of(new DoFn () { +@ProcessElement +public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } })).setWindowingStrategyInternal(outputStrategy); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e48b0e6b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index ac4f2df..8d0b83a 100644
[GitHub] incubator-beam pull request #1553: [BEAM-1111] Reject timers for ParDo in Sp...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1553 [BEAM-] Reject timers for ParDo in SparkRunner streaming evaluators. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM- Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1553 commit 0c948c5dce65b9651c576f34f17922184816a703 Author: Sela <ans...@paypal.com> Date: 2016-12-08T18:29:35Z [BEAM-] Reject timers for ParDo in SparkRunner streaming evaluators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam-site pull request #103: [BEAM-507] Fill in the documentation/...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam-site/pull/103 [BEAM-507] Fill in the documentation/runners/spark portion of the web⦠â¦site. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam-site BEAM-507 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam-site/pull/103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #103 commit 5e5caf73c848797eb7e5bc757d1f64cd8c517de8 Author: Sela <ans...@paypal.com> Date: 2016-12-08T13:34:54Z [BEAM-507] Fill in the documentation/runners/spark portion of the website. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1532: [BEAM-329] Spark runner README should hav...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1532 [BEAM-329] Spark runner README should have a proper batch example. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-329 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1532.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1532 commit c50fe89a071f59740b6f5bd90e1984ca3159162f Author: Sela <ans...@paypal.com> Date: 2016-12-07T09:20:07Z [BEAM-329] Spark runner README should have a proper batch example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1531
This closes #1531 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ccf6dbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ccf6dbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ccf6dbe Branch: refs/heads/master Commit: 9ccf6dbea0d3807fef6a7c0432906fffd2b8ec3f Parents: b41a46e baf5e6b Author: SelaAuthored: Wed Dec 7 10:31:38 2016 +0200 Committer: Sela Committed: Wed Dec 7 10:31:38 2016 +0200 -- runners/spark/pom.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: [BEAM-1094] Set test scope for Kafka IO and junit
Repository: incubator-beam Updated Branches: refs/heads/master b41a46e86 -> 9ccf6dbea [BEAM-1094] Set test scope for Kafka IO and junit Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/baf5e6bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/baf5e6bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/baf5e6bd Branch: refs/heads/master Commit: baf5e6bd9b1011f4c5c3974aa46393471b340c15 Parents: b41a46e Author: Jean-Baptiste OnofréAuthored: Wed Dec 7 08:37:33 2016 +0100 Committer: Sela Committed: Wed Dec 7 10:30:44 2016 +0200 -- runners/spark/pom.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/baf5e6bd/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index e34af15..9a3adf6 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -268,18 +268,20 @@ org.apache.beam beam-sdks-java-io-kafka + test org.apache.kafka kafka-clients 0.9.0.1 + test junit junit - provided + test hamcrest-core
[3/3] incubator-beam git commit: This closes #1466
This closes #1466 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6893a727 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6893a727 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6893a727 Branch: refs/heads/master Commit: 6893a7270b728ec72c6e8749008e6a583edf5921 Parents: ef9871c 158378f Author: SelaAuthored: Mon Dec 5 12:57:25 2016 +0200 Committer: Sela Committed: Mon Dec 5 12:57:25 2016 +0200 -- .../beam/runners/spark/EvaluationResult.java| 67 -- .../beam/runners/spark/SparkPipelineResult.java | 193 + .../apache/beam/runners/spark/SparkRunner.java | 113 ++ .../beam/runners/spark/TestSparkRunner.java | 11 +- .../spark/aggregators/AccumulatorSingleton.java | 6 +- .../spark/aggregators/SparkAggregators.java | 97 + .../beam/runners/spark/examples/WordCount.java | 2 +- .../spark/translation/EvaluationContext.java| 131 ++- .../spark/translation/SparkContextFactory.java | 2 +- .../spark/translation/SparkRuntimeContext.java | 62 +- .../spark/translation/TransformTranslator.java | 10 +- .../streaming/StreamingTransformTranslator.java | 10 +- .../runners/spark/ProvidedSparkContextTest.java | 6 +- .../runners/spark/SparkPipelineStateTest.java | 217 +++ .../spark/aggregators/ClearAggregatorsRule.java | 37 .../metrics/sink/ClearAggregatorsRule.java | 33 --- .../metrics/sink/NamedAggregatorsTest.java | 3 +- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../beam/runners/spark/io/NumShardsTest.java| 2 +- .../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +- .../spark/translation/SideEffectsTest.java | 59 - .../streaming/EmptyStreamAssertionTest.java | 4 + .../ResumeFromCheckpointStreamingTest.java | 15 +- .../streaming/utils/PAssertStreaming.java | 9 +- 24 files changed, 680 insertions(+), 413 deletions(-) --
[1/3] incubator-beam git commit: [BEAM-1000, BEAM-1050] Fixed PipelineResult.State Failed for streaming, support non-blocking cancel/waituntilfinish in batch. Added a SparkPipelineResult class to addr
Repository: incubator-beam Updated Branches: refs/heads/master ef9871c36 -> 6893a7270 [BEAM-1000, BEAM-1050] Fixed PipelineResult.State Failed for streaming, support non-blocking cancel/waituntilfinish in batch. Added a SparkPipelineResult class to address PipelineResult#waitUntilFinish() and SparkRunner#run() semantics. * Simplified beamExceptionFrom() to abstract away SparkExceptions. * Reordered methods according to access level. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1a67934 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1a67934 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1a67934 Branch: refs/heads/master Commit: b1a67934d1496e221718599301635c38f8e3b7ec Parents: ef9871c Author: Stas LevinAuthored: Mon Nov 28 11:11:10 2016 +0200 Committer: Sela Committed: Mon Dec 5 12:56:39 2016 +0200 -- .../beam/runners/spark/EvaluationResult.java| 67 -- .../beam/runners/spark/SparkPipelineResult.java | 179 +++ .../apache/beam/runners/spark/SparkRunner.java | 98 + .../beam/runners/spark/TestSparkRunner.java | 10 +- .../beam/runners/spark/examples/WordCount.java | 2 +- .../spark/translation/EvaluationContext.java| 119 ++ .../spark/translation/SparkContextFactory.java | 2 +- .../runners/spark/ProvidedSparkContextTest.java | 6 +- .../runners/spark/SparkPipelineStateTest.java | 219 +++ .../metrics/sink/ClearAggregatorsRule.java | 2 +- .../metrics/sink/NamedAggregatorsTest.java | 2 +- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../beam/runners/spark/io/NumShardsTest.java| 2 +- .../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +- .../spark/translation/SideEffectsTest.java | 59 - .../streaming/EmptyStreamAssertionTest.java | 4 + .../ResumeFromCheckpointStreamingTest.java | 8 +- .../streaming/utils/PAssertStreaming.java | 9 +- 18 files changed, 500 insertions(+), 292 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java deleted file mode 100644 index 52606a3..000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; - -/** - * Interface for retrieving the result(s) of running a pipeline. Allows us to translate between - * {@code PObject}s or {@code PCollection}s and Ts or collections of Ts. - */ -public interface EvaluationResult extends PipelineResult { - /** - * Retrieves an iterable of results associated with the PCollection passed in. - * - * @param pcollection Collection we wish to translate. - * @param Type of elements contained in collection. - * @return Natively types result associated with collection. - */ - Iterable get(PCollection pcollection); - - /** - * Retrieve an object of Type T associated with the PValue passed in. - * - * @param pval PValue to retrieve associated data for. - * @param Type of object to return. - * @return Native object. - */ - T get(PValue pval); - - /** - * Retrieves the final value of the aggregator. - * - * @param aggNamename of aggregator. - * @param resultType Class of final result of aggregation. - * @param Type of final result of aggregation. - * @return Result of aggregation associated with specified name. - */ - T
[2/3] incubator-beam git commit: Redistributed some responsibilities in order to remove getAggregatorValues() form EvaluationContext.
Redistributed some responsibilities in order to remove getAggregatorValues() form EvaluationContext. Inferred excepted exception handling according to existing codebase and tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/158378f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/158378f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/158378f0 Branch: refs/heads/master Commit: 158378f0f682b80462b917002b895ddbf782d06d Parents: b1a6793 Author: Stas LevinAuthored: Sat Dec 3 00:47:39 2016 +0200 Committer: Sela Committed: Mon Dec 5 12:56:41 2016 +0200 -- .../beam/runners/spark/SparkPipelineResult.java | 76 --- .../apache/beam/runners/spark/SparkRunner.java | 35 +-- .../beam/runners/spark/TestSparkRunner.java | 1 + .../spark/aggregators/AccumulatorSingleton.java | 6 +- .../spark/aggregators/SparkAggregators.java | 97 .../spark/translation/EvaluationContext.java| 20 +--- .../spark/translation/SparkRuntimeContext.java | 62 + .../spark/translation/TransformTranslator.java | 10 +- .../streaming/StreamingTransformTranslator.java | 10 +- .../runners/spark/SparkPipelineStateTest.java | 36 .../spark/aggregators/ClearAggregatorsRule.java | 37 .../metrics/sink/ClearAggregatorsRule.java | 33 --- .../metrics/sink/NamedAggregatorsTest.java | 1 + .../streaming/EmptyStreamAssertionTest.java | 2 +- .../ResumeFromCheckpointStreamingTest.java | 9 +- 15 files changed, 247 insertions(+), 188 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/158378f0/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index ec0610c..b1027a6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -23,7 +23,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; @@ -31,7 +31,10 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.spark.SparkException; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.joda.time.Duration; /** @@ -40,29 +43,37 @@ import org.joda.time.Duration; public abstract class SparkPipelineResult implements PipelineResult { protected final Future pipelineExecution; - protected final EvaluationContext context; + protected JavaSparkContext javaSparkContext; protected PipelineResult.State state; SparkPipelineResult(final Future pipelineExecution, - final EvaluationContext evaluationContext) { + final JavaSparkContext javaSparkContext) { this.pipelineExecution = pipelineExecution; -this.context = evaluationContext; +this.javaSparkContext = javaSparkContext; // pipelineExecution is expected to have started executing eagerly. state = State.RUNNING; } - private RuntimeException runtimeExceptionFrom(Throwable e) { + private RuntimeException runtimeExceptionFrom(final Throwable e) { return (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e); } - private RuntimeException beamExceptionFrom(Throwable e) { + private RuntimeException beamExceptionFrom(final Throwable e) { // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler // won't let you catch something that is not declared, so we can't catch // SparkException directly, instead we do an instanceof check. -return (e instanceof SparkException) -? new Pipeline.PipelineExecutionException(e.getCause() != null ? e.getCause() : e) -: runtimeExceptionFrom(e); + +if (e instanceof SparkException) { + if (e.getCause() !=
[GitHub] incubator-beam pull request #1500: [BEAM-1074] Set default-partitioner in So...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1500 [BEAM-1074] Set default-partitioner in SourceRDD.Unbounded. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-1074 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1500.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1500 commit 2cb3c0c2fc6e09e670f21561da5c83083a7308ad Author: Sela <ans...@paypal.com> Date: 2016-12-02T17:08:33Z [BEAM-1074] Set default-partitioner in SourceRDD.Unbounded. commit dcad79534f4bf3262bb872d12c7315e583836df8 Author: Sela <ans...@paypal.com> Date: 2016-12-03T16:38:02Z Make SourceRDD partitions number equal the HashPartitioner, which might create empty partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1450
This closes #1450 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a8b9b52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a8b9b52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a8b9b52 Branch: refs/heads/master Commit: 3a8b9b5212972f0128099251884473d06758e2aa Parents: 3f16f26 a1a4ac0 Author: SelaAuthored: Tue Nov 29 11:50:51 2016 +0200 Committer: Sela Committed: Tue Nov 29 11:50:51 2016 +0200 -- .../beam/runners/spark/io/MicrobatchSource.java | 20 ++-- .../beam/runners/spark/io/SourceDStream.java| 3 ++- .../spark/stateful/StateSpecFunctions.java | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-1052] Add InputDStream id to MicrobatchSource hashcode.
Repository: incubator-beam Updated Branches: refs/heads/master 3f16f2660 -> 3a8b9b521 [BEAM-1052] Add InputDStream id to MicrobatchSource hashcode. Done to avoid collisions between splits of different sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1a4ac0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1a4ac0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1a4ac0f Branch: refs/heads/master Commit: a1a4ac0fc0376aa4c43a4357f3acc930e2b53c94 Parents: 3f16f26 Author: Aviem ZurAuthored: Tue Nov 29 09:51:12 2016 +0200 Committer: Sela Committed: Tue Nov 29 11:49:31 2016 +0200 -- .../beam/runners/spark/io/MicrobatchSource.java | 20 ++-- .../beam/runners/spark/io/SourceDStream.java| 3 ++- .../spark/stateful/StateSpecFunctions.java | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 4a174aa..5656375 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -54,6 +54,7 @@ public class MicrobatchSource (splits.get(i), maxReadTime, 1, numRecords[i], i)); + result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId)); } return result; } @@ -137,8 +140,8 @@ public class MicrobatchSource >> compute(Time validTime) { MicrobatchSource microbatchSource = new MicrobatchSource<>( -unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1); +
[1/2] incubator-beam git commit: [BEAM-851] Determine if the pipeline must be translated into streaming mode (if not set)
Repository: incubator-beam Updated Branches: refs/heads/master 3ad767750 -> 8cc43aa70 [BEAM-851] Determine if the pipeline must be translated into streaming mode (if not set) Now an Evaluator (visitor) detects if there are Unbonded.Read transforms. This approach is based on Flink's PipelineTranslationOptimizer Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc96b138 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc96b138 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc96b138 Branch: refs/heads/master Commit: cc96b1381b6db849adf69daddecf30b9c61acf73 Parents: 3ad7677 Author: Ismaël MejÃaAuthored: Fri Nov 25 14:52:26 2016 +0100 Committer: Ismaël MejÃa Committed: Sun Nov 27 11:18:12 2016 +0100 -- .../apache/beam/runners/spark/SparkRunner.java | 61 +++- .../streaming/StreamingTransformTranslator.java | 2 +- .../streaming/EmptyStreamAssertionTest.java | 2 + .../streaming/FlattenStreamingTest.java | 2 + .../streaming/SimpleStreamingWordCountTest.java | 1 + .../SparkTestPipelineOptionsForStreaming.java | 6 -- 6 files changed, 65 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index e800071..49e0113 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; @@ -120,12 +121,12 @@ public final class SparkRunner extends PipelineRunner { mOptions = options; } - @Override public EvaluationResult run(Pipeline pipeline) { try { LOG.info("Executing pipeline using the SparkRunner."); + detectTranslationMode(pipeline); if (mOptions.isStreaming()) { SparkRunnerStreamingContextFactory contextFactory = new SparkRunnerStreamingContextFactory(pipeline, mOptions); @@ -136,7 +137,7 @@ public final class SparkRunner extends PipelineRunner { jssc.start(); // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. -return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(), +return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sparkContext(), pipeline, jssc) : contextFactory.getCtxt(); } else { JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); @@ -168,6 +169,62 @@ public final class SparkRunner extends PipelineRunner { } /** + * Detect the translation mode for the pipeline and change options in case streaming + * translation is needed. + * @param pipeline + */ + private void detectTranslationMode(Pipeline pipeline) { +TranslationModeDetector detector = new TranslationModeDetector(); +pipeline.traverseTopologically(detector); +if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) { + // set streaming mode if it's a streaming pipeline + this.mOptions.setStreaming(true); +} + } + + /** + * The translation mode of the Beam Pipeline. + */ + enum TranslationMode { +/** Uses the batch mode. */ +BATCH, +/** Uses the streaming mode. */ +STREAMING + } + + /** + * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. + */ + static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults { +private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class); + +private TranslationMode translationMode; + +TranslationModeDetector(TranslationMode defaultMode) { + this.translationMode = defaultMode; +} + +TranslationModeDetector() { + this(TranslationMode.BATCH); +} + +TranslationMode getTranslationMode() { + return translationMode; +} + +@Override +public void visitPrimitiveTransform(TransformTreeNode node) { +
[2/2] incubator-beam git commit: This closes #1436
This closes #1436 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cc43aa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cc43aa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cc43aa7 Branch: refs/heads/master Commit: 8cc43aa701807009ec826e752b2f1bb95442450f Parents: 3ad7677 cc96b13 Author: SelaAuthored: Sun Nov 27 13:19:59 2016 +0200 Committer: Sela Committed: Sun Nov 27 13:19:59 2016 +0200 -- .../apache/beam/runners/spark/SparkRunner.java | 61 +++- .../streaming/StreamingTransformTranslator.java | 2 +- .../streaming/EmptyStreamAssertionTest.java | 2 + .../streaming/FlattenStreamingTest.java | 2 + .../streaming/SimpleStreamingWordCountTest.java | 1 + .../SparkTestPipelineOptionsForStreaming.java | 6 -- 6 files changed, 65 insertions(+), 9 deletions(-) --
[1/3] incubator-beam git commit: Remove unused WindowingInternals.writePCollectionViewData
Repository: incubator-beam Updated Branches: refs/heads/master 632576b5b -> 3ad767750 Remove unused WindowingInternals.writePCollectionViewData Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/803bbe2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/803bbe2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/803bbe2a Branch: refs/heads/master Commit: 803bbe2a3026424f509e13809a8eecb79990e5fe Parents: 07544ef Author: Kenneth KnowlesAuthored: Wed Nov 23 11:23:07 2016 -0800 Committer: Sela Committed: Sat Nov 26 12:47:14 2016 +0200 -- .../operators/ApexGroupByKeyOperator.java | 10 -- .../beam/runners/core/SimpleDoFnRunner.java | 18 -- .../beam/runners/core/SimpleOldDoFnRunner.java| 16 .../functions/FlinkProcessContextBase.java| 8 .../spark/translation/SparkProcessContext.java| 9 - .../apache/beam/sdk/transforms/DoFnTester.java| 10 -- .../apache/beam/sdk/util/WindowingInternals.java | 10 -- 7 files changed, 81 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index eca4308..3b0e4f2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -30,8 +30,6 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; - -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -40,7 +38,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; @@ -392,13 +389,6 @@ public class ApexGroupByKeyOperator implements Operator { } @Override -public void writePCollectionViewData( -TupleTag tag, Iterable data, Coder elemCoder) -throws IOException { - throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); -} - -@Override public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { throw new RuntimeException("sideInput() is not available in Streaming mode."); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 841e412..f611c0a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; @@ -595,21 +592,6 @@ public class SimpleDoFnRunner implements DoFnRunner data, Coder elemCoder) -throws IOException { - @SuppressWarnings("unchecked") - Coder windowCoder = (Coder)
[3/3] incubator-beam git commit: [BEAM-498] Remove obsolete WindowingInternals#writePCollectionViewData This closes #1430
[BEAM-498] Remove obsolete WindowingInternals#writePCollectionViewData This closes #1430 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ad76775 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ad76775 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ad76775 Branch: refs/heads/master Commit: 3ad7677503977108b5a67c315bd1cc6ead3ee998 Parents: 632576b 803bbe2 Author: SelaAuthored: Sat Nov 26 12:50:01 2016 +0200 Committer: Sela Committed: Sat Nov 26 12:50:01 2016 +0200 -- .../operators/ApexGroupByKeyOperator.java | 10 -- .../beam/runners/core/SimpleDoFnRunner.java | 18 -- .../beam/runners/core/SimpleOldDoFnRunner.java| 16 .../functions/FlinkProcessContextBase.java| 8 .../beam/runners/dataflow/DataflowRunner.java | 14 +- .../spark/translation/SparkProcessContext.java| 9 - .../apache/beam/sdk/transforms/DoFnTester.java| 10 -- .../apache/beam/sdk/util/WindowingInternals.java | 10 -- 8 files changed, 5 insertions(+), 90 deletions(-) --
[2/3] incubator-beam git commit: Remove unused body of StreamingPCollectionViewWriterFn
Remove unused body of StreamingPCollectionViewWriterFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/07544ef3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/07544ef3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/07544ef3 Branch: refs/heads/master Commit: 07544ef3a47bbdfacc00c75af875c3533a5fe477 Parents: 632576b Author: Kenneth KnowlesAuthored: Wed Nov 23 11:22:08 2016 -0800 Committer: Sela Committed: Sat Nov 26 12:47:14 2016 +0200 -- .../apache/beam/runners/dataflow/DataflowRunner.java | 14 +- 1 file changed, 5 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07544ef3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 36328e9..f1d41f2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2360,8 +2360,8 @@ public class DataflowRunner extends PipelineRunner { } /** - * A specialized {@link DoFn} for writing the contents of a {@link PCollection} - * to a streaming {@link PCollectionView} backend implementation. + * A marker {@link DoFn} for writing the contents of a {@link PCollection} to a streaming + * {@link PCollectionView} backend implementation. */ @Deprecated public static class StreamingPCollectionViewWriterFn @@ -2389,13 +2389,9 @@ public class DataflowRunner extends PipelineRunner { @Override public void processElement(ProcessContext c) throws Exception { - List output = new ArrayList<>(); - for (T elem : c.element()) { -output.add(WindowedValue.of(elem, c.timestamp(), c.window(), c.pane())); - } - - c.windowingInternals().writePCollectionViewData( - view.getTagInternal(), output, dataCoder); + throw new UnsupportedOperationException( + String.format( + "%s is a marker class only and should never be executed.", getClass().getName())); } }
[2/2] incubator-beam git commit: This closes #1393
This closes #1393 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/875631f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/875631f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/875631f0 Branch: refs/heads/master Commit: 875631f07b1e4556afec28dc850bd7fe2d07444b Parents: d93e9a8 dafd5be Author: SelaAuthored: Sun Nov 20 19:26:52 2016 +0200 Committer: Sela Committed: Sun Nov 20 19:26:52 2016 +0200 -- .../runners/spark/SparkPipelineOptions.java | 6 -- .../apache/beam/runners/spark/SparkRunner.java | 5 +- .../spark/translation/EvaluationContext.java| 59 +--- .../SparkRunnerStreamingContextFactory.java | 3 +- .../streaming/EmptyStreamAssertionTest.java | 3 +- .../streaming/FlattenStreamingTest.java | 4 +- .../streaming/KafkaStreamingTest.java | 11 ++-- .../ResumeFromCheckpointStreamingTest.java | 3 +- .../streaming/SimpleStreamingWordCountTest.java | 4 +- .../streaming/utils/PAssertStreaming.java | 8 ++- .../SparkTestPipelineOptionsForStreaming.java | 1 - 11 files changed, 60 insertions(+), 47 deletions(-) --
[1/2] incubator-beam git commit: [Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming applications. remove timeout parameer in spark pipeline option.
Repository: incubator-beam Updated Branches: refs/heads/master d93e9a88b -> 875631f07 [Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming applications. remove timeout parameer in spark pipeline option. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafd5be7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafd5be7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafd5be7 Branch: refs/heads/master Commit: dafd5be7f69f191fc9edb8b9f9aec010ca368f50 Parents: d93e9a8 Author: ksalantAuthored: Sun Nov 20 11:57:16 2016 +0200 Committer: Sela Committed: Sun Nov 20 19:25:52 2016 +0200 -- .../runners/spark/SparkPipelineOptions.java | 6 -- .../apache/beam/runners/spark/SparkRunner.java | 5 +- .../spark/translation/EvaluationContext.java| 59 +--- .../SparkRunnerStreamingContextFactory.java | 3 +- .../streaming/EmptyStreamAssertionTest.java | 3 +- .../streaming/FlattenStreamingTest.java | 4 +- .../streaming/KafkaStreamingTest.java | 11 ++-- .../ResumeFromCheckpointStreamingTest.java | 3 +- .../streaming/SimpleStreamingWordCountTest.java | 4 +- .../streaming/utils/PAssertStreaming.java | 8 ++- .../SparkTestPipelineOptionsForStreaming.java | 1 - 11 files changed, 60 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index b1ebde9..0fd790e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -39,12 +39,6 @@ public interface SparkPipelineOptions String getSparkMaster(); void setSparkMaster(String master); - @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until " - + "execution is stopped") - @Default.Long(-1) - Long getTimeout(); - void setTimeout(Long timeoutMillis); - @Description("Batch interval for Spark streaming in milliseconds.") @Default.Long(1000) Long getBatchIntervalMillis(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 6bbef39..e800071 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -137,11 +137,8 @@ public final class SparkRunner extends PipelineRunner { // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(), -pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt(); +pipeline, jssc) : contextFactory.getCtxt(); } else { -if (mOptions.getTimeout() > 0) { - LOG.info("Timeout is ignored by the SparkRunner in batch."); -} JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); SparkPipelineTranslator translator = new TransformTranslator.Translator(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index aaf7573..1183fbb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.StreamingContextState; import
[1/2] incubator-beam git commit: [BEAM-983] Fix a bunch of precommit errors from #1332
Repository: incubator-beam Updated Branches: refs/heads/master 201110222 -> dbbd5e448 [BEAM-983] Fix a bunch of precommit errors from #1332 Renames TestPipelineOptions to SparkTestPipelineOptions To avoid confusion with sdk.testing.TestPipelineOptions. Also, a couple of other minor fixes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd740ee1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd740ee1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd740ee1 Branch: refs/heads/master Commit: dd740ee1b20ab6921db3620ac28499dc66511482 Parents: 2011102 Author: Eugene KirpichovAuthored: Tue Nov 15 14:25:51 2016 -0800 Committer: Sela Committed: Wed Nov 16 01:49:59 2016 +0200 -- .../runners/spark/ProvidedSparkContextTest.java | 2 - .../metrics/sink/NamedAggregatorsTest.java | 4 +- .../beam/runners/spark/io/AvroPipelineTest.java | 5 +-- .../beam/runners/spark/io/NumShardsTest.java| 5 +-- .../io/hadoop/HadoopFileFormatPipelineTest.java | 5 +-- .../spark/translation/SideEffectsTest.java | 34 ++- .../streaming/EmptyStreamAssertionTest.java | 5 ++- .../streaming/FlattenStreamingTest.java | 5 ++- .../streaming/KafkaStreamingTest.java | 5 ++- .../ResumeFromCheckpointStreamingTest.java | 5 ++- .../streaming/SimpleStreamingWordCountTest.java | 5 ++- .../utils/SparkTestPipelineOptions.java | 42 +++ .../SparkTestPipelineOptionsForStreaming.java | 44 .../streaming/utils/TestPipelineOptions.java| 25 --- .../utils/TestPipelineOptionsForStreaming.java | 44 15 files changed, 121 insertions(+), 114 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index bc337c7..fe73aba 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.JavaSparkContext; -import org.junit.Rule; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index c220f2b..c16574c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; @@ -52,7 +52,7 @@ public class NamedAggregatorsTest { public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule(); @Rule - public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); private Pipeline createSparkPipeline() { SparkPipelineOptions options = pipelineOptions.getOptions();
[2/2] incubator-beam git commit: This closes #1364
This closes #1364 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dbbd5e44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dbbd5e44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dbbd5e44 Branch: refs/heads/master Commit: dbbd5e44800586c3f1a2efc58b91d79126c843d3 Parents: 2011102 dd740ee Author: SelaAuthored: Wed Nov 16 01:51:51 2016 +0200 Committer: Sela Committed: Wed Nov 16 01:51:51 2016 +0200 -- .../runners/spark/ProvidedSparkContextTest.java | 2 - .../metrics/sink/NamedAggregatorsTest.java | 4 +- .../beam/runners/spark/io/AvroPipelineTest.java | 5 +-- .../beam/runners/spark/io/NumShardsTest.java| 5 +-- .../io/hadoop/HadoopFileFormatPipelineTest.java | 5 +-- .../spark/translation/SideEffectsTest.java | 34 ++- .../streaming/EmptyStreamAssertionTest.java | 5 ++- .../streaming/FlattenStreamingTest.java | 5 ++- .../streaming/KafkaStreamingTest.java | 5 ++- .../ResumeFromCheckpointStreamingTest.java | 5 ++- .../streaming/SimpleStreamingWordCountTest.java | 5 ++- .../utils/SparkTestPipelineOptions.java | 42 +++ .../SparkTestPipelineOptionsForStreaming.java | 44 .../streaming/utils/TestPipelineOptions.java| 25 --- .../utils/TestPipelineOptionsForStreaming.java | 44 15 files changed, 121 insertions(+), 114 deletions(-) --
[GitHub] incubator-beam pull request #1362: [BEAM-983] runners/spark/translation/stre...
Github user amitsela closed the pull request at: https://github.com/apache/incubator-beam/pull/1362 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1362: [BEAM-983] add missing license.
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1362 [BEAM-983] add missing license. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-983 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1362.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1362 commit 19d180594a7cce258fc78632521a6536bde85935 Author: Sela <ans...@paypal.com> Date: 2016-11-15T18:53:38Z [BEAM-983] add missing license. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-891] fix build occasionally fails on IndexOutOfBoundsException.
Repository: incubator-beam Updated Branches: refs/heads/master 47646d641 -> 503f26f44 [BEAM-891] fix build occasionally fails on IndexOutOfBoundsException. Moved "TestPipelineOptions#withTmpCheckpointDir" to TestPipelineOptionsForStreaming. Removed an unused member in ProvidedSparkContextTest. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0331dd1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0331dd1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0331dd1c Branch: refs/heads/master Commit: 0331dd1cd75e60484f0b15723e4e7edc280a4d12 Parents: 47646d6 Author: Stas LevinAuthored: Thu Nov 10 13:32:51 2016 +0200 Committer: Sela Committed: Tue Nov 15 19:52:12 2016 +0200 -- runners/spark/pom.xml | 3 +- .../runners/spark/SparkPipelineOptions.java | 4 +- .../spark/translation/SparkRuntimeContext.java | 4 +- .../runners/spark/ProvidedSparkContextTest.java | 26 - .../metrics/sink/NamedAggregatorsTest.java | 13 +++-- .../beam/runners/spark/io/AvroPipelineTest.java | 12 ++--- .../beam/runners/spark/io/NumShardsTest.java| 10 ++-- .../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++--- .../spark/translation/SideEffectsTest.java | 11 ++-- .../streaming/EmptyStreamAssertionTest.java | 4 +- .../streaming/FlattenStreamingTest.java | 4 +- .../streaming/KafkaStreamingTest.java | 4 +- .../ResumeFromCheckpointStreamingTest.java | 4 +- .../streaming/SimpleStreamingWordCountTest.java | 6 +-- .../utils/TestOptionsForStreaming.java | 55 .../streaming/utils/TestPipelineOptions.java| 25 + .../utils/TestPipelineOptionsForStreaming.java | 44 17 files changed, 132 insertions(+), 109 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 1e4a720..4c5b3f5 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -82,7 +82,8 @@ [ "--runner=TestSparkRunner", -"--streaming=false" +"--streaming=false", +"--enableSparkMetricSinks=false" ] true http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 5168c6c..b1ebde9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -88,8 +88,8 @@ public interface SparkPipelineOptions @Description("Enable/disable sending aggregator values to Spark's metric sinks") @Default.Boolean(true) - Boolean getEnableSparkSinks(); - void setEnableSparkSinks(Boolean enableSparkSinks); + Boolean getEnableSparkMetricSinks(); + void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks); @Description("If the spark runner will be initialized with a provided Spark Context. " + "The Spark Context should be provided with SparkContextOptions.") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 181a111..564db39 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -86,11 +86,11 @@ public class SparkRuntimeContext implements Serializable { final Accumulator accum = AccumulatorSingleton.getInstance(jsc); final NamedAggregators initialValue = accum.value(); -if (opts.getEnableSparkSinks()) { +if (opts.getEnableSparkMetricSinks()) { final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); final AggregatorMetricSource aggregatorMetricSource =
[2/2] incubator-beam git commit: This closes #1332
This closes #1332 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/503f26f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/503f26f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/503f26f4 Branch: refs/heads/master Commit: 503f26f448ea9f46fcfcdd46e60cba80e4844e28 Parents: 47646d6 0331dd1 Author: SelaAuthored: Tue Nov 15 19:53:52 2016 +0200 Committer: Sela Committed: Tue Nov 15 19:53:52 2016 +0200 -- runners/spark/pom.xml | 3 +- .../runners/spark/SparkPipelineOptions.java | 4 +- .../spark/translation/SparkRuntimeContext.java | 4 +- .../runners/spark/ProvidedSparkContextTest.java | 26 - .../metrics/sink/NamedAggregatorsTest.java | 13 +++-- .../beam/runners/spark/io/AvroPipelineTest.java | 12 ++--- .../beam/runners/spark/io/NumShardsTest.java| 10 ++-- .../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++--- .../spark/translation/SideEffectsTest.java | 11 ++-- .../streaming/EmptyStreamAssertionTest.java | 4 +- .../streaming/FlattenStreamingTest.java | 4 +- .../streaming/KafkaStreamingTest.java | 4 +- .../ResumeFromCheckpointStreamingTest.java | 4 +- .../streaming/SimpleStreamingWordCountTest.java | 6 +-- .../utils/TestOptionsForStreaming.java | 55 .../streaming/utils/TestPipelineOptions.java| 25 + .../utils/TestPipelineOptionsForStreaming.java | 44 17 files changed, 132 insertions(+), 109 deletions(-) --
[2/2] incubator-beam git commit: This closes #1291
This closes #1291 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2bc66f90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2bc66f90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2bc66f90 Branch: refs/heads/master Commit: 2bc66f903cdfa328c4bb3546befbaa0f58bdd6fa Parents: 9c300cd 1bef01f Author: SelaAuthored: Tue Nov 15 13:37:20 2016 +0200 Committer: Sela Committed: Tue Nov 15 13:37:20 2016 +0200 -- .../apache/beam/runners/spark/SparkRunner.java | 4 +- .../spark/translation/BoundedDataset.java | 114 .../beam/runners/spark/translation/Dataset.java | 34 +++ .../spark/translation/EvaluationContext.java| 230 +++- .../spark/translation/TransformTranslator.java | 99 +++ .../SparkRunnerStreamingContextFactory.java | 7 +- .../streaming/StreamingEvaluationContext.java | 272 --- .../streaming/StreamingTransformTranslator.java | 135 + .../translation/streaming/UnboundedDataset.java | 103 +++ 9 files changed, 464 insertions(+), 534 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext
Repository: incubator-beam Updated Branches: refs/heads/master 9c300cde8 -> 2bc66f903 [BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext PR 1291 review changes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1bef01fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1bef01fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1bef01fe Branch: refs/heads/master Commit: 1bef01fef5ff5ff9a960c85b00c2cc4aa504ce4d Parents: 9c300cd Author: Aviem ZurAuthored: Sun Nov 13 13:57:07 2016 +0200 Committer: Sela Committed: Tue Nov 15 13:35:49 2016 +0200 -- .../apache/beam/runners/spark/SparkRunner.java | 4 +- .../spark/translation/BoundedDataset.java | 114 .../beam/runners/spark/translation/Dataset.java | 34 +++ .../spark/translation/EvaluationContext.java| 230 +++- .../spark/translation/TransformTranslator.java | 99 +++ .../SparkRunnerStreamingContextFactory.java | 7 +- .../streaming/StreamingEvaluationContext.java | 272 --- .../streaming/StreamingTransformTranslator.java | 135 + .../translation/streaming/UnboundedDataset.java | 103 +++ 9 files changed, 464 insertions(+), 534 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 45c7f55..6bbef39 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -26,7 +26,6 @@ import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; -import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -49,6 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * The SparkRunner translate operations defined on a pipeline to a representation * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run @@ -136,7 +136,7 @@ public final class SparkRunner extends PipelineRunner { jssc.start(); // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. -return contextFactory.getCtxt() == null ? new StreamingEvaluationContext(jssc.sc(), +return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(), pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt(); } else { if (mOptions.getTimeout() > 0) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java new file mode 100644 index 000..774efb9 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation; +
[1/2] incubator-beam git commit: [BEAM-944] Spark runner causes an exception when creating pipeline options. Create a SparkContextOptions for context-ware options.
Repository: incubator-beam Updated Branches: refs/heads/master cd3f61cf8 -> e43a38355 [BEAM-944] Spark runner causes an exception when creating pipeline options. Create a SparkContextOptions for context-ware options. Move UsesProvidedSparkContext property to SparkPipelineOptions so it's available from command-line as well. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/121bff46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/121bff46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/121bff46 Branch: refs/heads/master Commit: 121bff46d950e319eebf10e3a42bdd890edfb0c5 Parents: cd3f61c Author: SelaAuthored: Tue Nov 8 23:05:13 2016 +0200 Committer: Sela Committed: Thu Nov 10 23:27:17 2016 +0200 -- .../beam/runners/spark/SparkContextOptions.java | 64 .../runners/spark/SparkPipelineOptions.java | 36 +++ .../spark/translation/SparkContextFactory.java | 19 +++--- .../SparkRunnerStreamingContextFactory.java | 3 +- .../runners/spark/ProvidedSparkContextTest.java | 6 +- .../streaming/KafkaStreamingTest.java | 4 +- 6 files changed, 91 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java new file mode 100644 index 000..98f7492 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingListener; + + + +/** + * A custom {@link PipelineOptions} to work with properties related to {@link JavaSparkContext}. + * + * This can only be used programmatically (as opposed to passing command line arguments), + * since the properties here are context-aware and should not be propagated to workers. + * + * Separating this from {@link SparkPipelineOptions} is needed so the context-aware properties, + * which link to Spark dependencies, won't be scanned by {@link PipelineOptions} + * reflective instantiation. + * Note that {@link SparkContextOptions} is not registered with {@link SparkRunnerRegistrar}. + */ +public interface SparkContextOptions extends SparkPipelineOptions { + + @Description("Provided Java Spark Context") + @JsonIgnore + JavaSparkContext getProvidedSparkContext(); + void setProvidedSparkContext(JavaSparkContext jsc); + + @Description("Spark streaming listeners") + @Default.InstanceFactory(EmptyListenersList.class) + @JsonIgnore + List getListeners(); + void setListeners(List listeners); + + /** Returns an empty list, top avoid handling null. */ + class EmptyListenersList implements DefaultValueFactory { +@Override +public List create(PipelineOptions options) { + return new ArrayList<>(); +} + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index
[2/2] incubator-beam git commit: This closes #1316
This closes #1316 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e43a3835 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e43a3835 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e43a3835 Branch: refs/heads/master Commit: e43a383559cb825a498c7427d58ce0a56b3f5245 Parents: cd3f61c 121bff4 Author: SelaAuthored: Thu Nov 10 23:27:55 2016 +0200 Committer: Sela Committed: Thu Nov 10 23:27:55 2016 +0200 -- .../beam/runners/spark/SparkContextOptions.java | 64 .../runners/spark/SparkPipelineOptions.java | 36 +++ .../spark/translation/SparkContextFactory.java | 19 +++--- .../SparkRunnerStreamingContextFactory.java | 3 +- .../runners/spark/ProvidedSparkContextTest.java | 6 +- .../streaming/KafkaStreamingTest.java | 4 +- 6 files changed, 91 insertions(+), 41 deletions(-) --
[GitHub] incubator-beam pull request #1316: [BEAM-944] Spark runner causes an excepti...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1316 [BEAM-944] Spark runner causes an exception when creating pipeline op⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦tions. Create a SparkContextOptions for context-ware options. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-944 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1316.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1316 commit 45823481d1356caad138ca5c8504276045425411 Author: Sela <ans...@paypal.com> Date: 2016-11-08T21:05:13Z [BEAM-944] Spark runner causes an exception when creating pipeline options. Create a SparkContextOptions for context-ware options. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1257: [BEAM-880] Avoid emitting default empty i...
Github user amitsela closed the pull request at: https://github.com/apache/incubator-beam/pull/1257 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a reliable fs.
Repository: incubator-beam Updated Branches: refs/heads/master 46fbfe06b -> 14e093a0a [BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a reliable fs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a75d1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a75d1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a75d1f Branch: refs/heads/master Commit: 90a75d1fb0706ec4cc25a9eeeca8ade1b3b7de28 Parents: 46fbfe0 Author: SelaAuthored: Thu Nov 3 18:22:20 2016 +0200 Committer: Sela Committed: Fri Nov 4 23:59:40 2016 +0200 -- .../runners/spark/SparkPipelineOptions.java | 3 +-- .../SparkRunnerStreamingContextFactory.java | 23 +--- .../streaming/EmptyStreamAssertionTest.java | 3 +-- .../streaming/FlattenStreamingTest.java | 6 ++--- .../streaming/KafkaStreamingTest.java | 6 ++--- .../ResumeFromCheckpointStreamingTest.java | 3 +-- .../streaming/SimpleStreamingWordCountTest.java | 3 +-- .../utils/TestOptionsForStreaming.java | 12 +- 8 files changed, 19 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 08e14fe..4eada35 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -77,8 +77,7 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, class TmpCheckpointDirFactory implements DefaultValueFactory { @Override public String create(PipelineOptions options) { - SparkPipelineOptions sparkPipelineOptions = options.as(SparkPipelineOptions.class); - return "file:///tmp/" + sparkPipelineOptions.getJobName(); + return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 2378788..a670f61 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -20,11 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.translation.SparkContextFactory; @@ -48,7 +43,7 @@ import org.slf4j.LoggerFactory; public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class); - private static final Iterable KNOWN_RELIABLE_FS = Arrays.asList("hdfs", "s3", "gs"); + private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; private final Pipeline pipeline; private final SparkPipelineOptions options; @@ -83,19 +78,11 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF // set checkpoint dir. String checkpointDir = options.getCheckpointDir(); -LOG.info("Checkpoint dir set to: {}", checkpointDir); -try { - // validate checkpoint dir and warn if not of a known durable filesystem. - URL checkpointDirUrl = new URL(checkpointDir); - if (!Iterables.any(KNOWN_RELIABLE_FS, Predicates.equalTo(checkpointDirUrl.getProtocol( { -LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, in case of failures " -+ "this job may not recover properly or even at all.",
[2/2] incubator-beam git commit: This closes #1272
This closes #1272 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14e093a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14e093a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14e093a0 Branch: refs/heads/master Commit: 14e093a0a574c8c3920a83c38e411a06b29bf44b Parents: 46fbfe0 90a75d1 Author: SelaAuthored: Sat Nov 5 00:02:22 2016 +0200 Committer: Sela Committed: Sat Nov 5 00:02:22 2016 +0200 -- .../runners/spark/SparkPipelineOptions.java | 3 +-- .../SparkRunnerStreamingContextFactory.java | 23 +--- .../streaming/EmptyStreamAssertionTest.java | 3 +-- .../streaming/FlattenStreamingTest.java | 6 ++--- .../streaming/KafkaStreamingTest.java | 6 ++--- .../ResumeFromCheckpointStreamingTest.java | 3 +-- .../streaming/SimpleStreamingWordCountTest.java | 3 +-- .../utils/TestOptionsForStreaming.java | 12 +- 8 files changed, 19 insertions(+), 40 deletions(-) --
[GitHub] incubator-beam-site pull request #65: [BEAM-890] Update compatibility matrix...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam-site/pull/65 [BEAM-890] Update compatibility matrix for Spark. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam-site BEAM-890 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam-site/pull/65.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #65 commit 97c147fb3dbd4b860f441aa13d22c723e31021e4 Author: Sela <ans...@paypal.com> Date: 2016-11-04T08:57:54Z [BEAM-890] Update compatibility matrix for Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1271: [BEAM-889] CheckpointDir option does not ...
Github user amitsela closed the pull request at: https://github.com/apache/incubator-beam/pull/1271 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1272: [BEAM-889] CheckpointDir option does not ...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1272 [BEAM-889] CheckpointDir option does not accept relative path and req⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦uires protocol. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-889 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1272.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1272 commit b4c9e19ffadae3ed9c64e22aeb11dd3dc7ac0872 Author: Sela <ans...@paypal.com> Date: 2016-11-03T16:22:20Z [BEAM-889] CheckpointDir option does not accept relative path and requires protocol. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1271: [BEAM-889] CheckpointDir option does not ...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1271 [BEAM-889] CheckpointDir option does not accept relative path and req⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦uires protocol. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1271.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1271 commit 2bf8163290bae3273ad068daf8f3776a8a8ad9cd Author: Sela <ans...@paypal.com> Date: 2016-11-03T14:56:08Z [BEAM-889] CheckpointDir option does not accept relative path and requires protocol. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS."
Repository: incubator-beam Updated Branches: refs/heads/master d75d8b2bb -> 6a05cf4a9 Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS." This reverts commit ff5409f15a9f741f437c489b7de763cfa3c68278. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f233d858 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f233d858 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f233d858 Branch: refs/heads/master Commit: f233d858810cacace21b05e525131f05b26e6380 Parents: d75d8b2 Author: SelaAuthored: Thu Nov 3 14:13:33 2016 +0200 Committer: Sela Committed: Thu Nov 3 14:13:33 2016 +0200 -- runners/spark/pom.xml | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f233d858/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index e83aedf..71a3ac2 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -87,7 +87,6 @@ true false -64
[2/2] incubator-beam git commit: This closes #1270
This closes #1270 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a05cf4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a05cf4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a05cf4a Branch: refs/heads/master Commit: 6a05cf4a94728eb5e73a96e260eb5ef0bdc5fc9b Parents: d75d8b2 f233d85 Author: SelaAuthored: Thu Nov 3 15:35:10 2016 +0200 Committer: Sela Committed: Thu Nov 3 15:35:10 2016 +0200 -- runners/spark/pom.xml | 1 - 1 file changed, 1 deletion(-) --
[GitHub] incubator-beam pull request #1270: Revert "[BEAM-808] Increase "spark.port.m...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1270 Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindExce⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦ption in ROS." This reverts commit ff5409f15a9f741f437c489b7de763cfa3c68278. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-888 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1270.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1270 commit f233d858810cacace21b05e525131f05b26e6380 Author: Sela <ans...@paypal.com> Date: 2016-11-03T12:13:33Z Revert "[BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS." This reverts commit ff5409f15a9f741f437c489b7de763cfa3c68278. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS.
Repository: incubator-beam Updated Branches: refs/heads/master 2c0d0f476 -> 529f266ae [BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff5409f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff5409f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff5409f1 Branch: refs/heads/master Commit: ff5409f15a9f741f437c489b7de763cfa3c68278 Parents: 2c0d0f4 Author: SelaAuthored: Mon Oct 24 21:52:03 2016 +0300 Committer: Sela Committed: Wed Nov 2 14:53:16 2016 +0200 -- runners/spark/pom.xml | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff5409f1/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 71a3ac2..e83aedf 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -87,6 +87,7 @@ true false +64
[2/2] incubator-beam git commit: This closes #1170
This closes #1170 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/529f266a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/529f266a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/529f266a Branch: refs/heads/master Commit: 529f266ae91167997a17a952ee98b1721bba47a8 Parents: 2c0d0f4 ff5409f Author: SelaAuthored: Wed Nov 2 14:54:21 2016 +0200 Committer: Sela Committed: Wed Nov 2 14:54:21 2016 +0200 -- runners/spark/pom.xml | 1 + 1 file changed, 1 insertion(+) --
[GitHub] incubator-beam pull request #1257: [BEAM-880] Avoid emitting default empty i...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1257 [BEAM-880] Avoid emitting default empty iterable in PAssert.GroupedGl⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦obally. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam passert-remove-empty-dummy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1257.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1257 commit 99a79e3d19a6584a19c628e71a21400f97b4754a Author: Sela <ans...@paypal.com> Date: 2016-11-02T12:17:12Z [BEAM-880] Avoid emitting default empty iterable in PAssert.GroupedGlobally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Directly implement ReifyTimestampsAndWindows in SparkRunner
Repository: incubator-beam Updated Branches: refs/heads/master 215980ad3 -> 9c3e3e7a3 Directly implement ReifyTimestampsAndWindows in SparkRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/597e3955 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/597e3955 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/597e3955 Branch: refs/heads/master Commit: 597e3955c219a7c50df124a0689b99b98dfbbbc9 Parents: 215980a Author: Kenneth KnowlesAuthored: Thu Oct 27 22:18:19 2016 -0700 Committer: Sela Committed: Fri Oct 28 10:56:44 2016 +0300 -- .../translation/GroupCombineFunctions.java | 5 +-- .../ReifyTimestampsAndWindowsFunction.java | 47 2 files changed, 48 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index e2a0f87..421b1b0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -48,7 +47,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; - import scala.Tuple2; @@ -77,8 +75,7 @@ public class GroupCombineFunctions { // Use coders to convert objects in the PCollection to byte arrays, so they // can be transferred over the network for the shuffle. JavaRDD , KV >(null, -new ReifyTimestampAndWindowsDoFn (), runtimeContext, null, null)) +rdd.map(new ReifyTimestampsAndWindowsFunction ()) .map(WindowingHelpers. >unwindowFunction()) .mapToPair(TranslationUtils. toPairFunction()) .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java new file mode 100644 index 000..8281c17 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.api.java.function.Function; + +/** + * Simple {@link Function} to bring the windowing information into the value
[1/2] incubator-beam git commit: [BEAM-809] Create a KryoRegistrator for the SparkRunner.
Repository: incubator-beam Updated Branches: refs/heads/master 53fe3ee42 -> 78e2c0387 [BEAM-809] Create a KryoRegistrator for the SparkRunner. Use Class#getName() instead of canonicalName(). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13b83858 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13b83858 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13b83858 Branch: refs/heads/master Commit: 13b83858746356068a6d618e04da6839e837d28c Parents: 53fe3ee Author: SelaAuthored: Mon Oct 24 22:35:39 2016 +0300 Committer: Sela Committed: Wed Oct 26 18:53:28 2016 +0300 -- runners/spark/pom.xml | 23 ++ .../coders/BeamSparkRunnerRegistrator.java | 46 .../spark/translation/SparkContextFactory.java | 5 ++- 3 files changed, 73 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index ccec3c6..458205a 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -147,6 +147,29 @@ provided + com.esotericsoftware.kryo + kryo + 2.21 + provided + + + de.javakaffee + kryo-serializers + 0.39 + + + + com.esotericsoftware + kryo + + + + com.google.protobuf + protobuf-java + + + + com.google.code.findbugs jsr305 1.3.9 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java new file mode 100644 index 000..0e62781 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.coders; + +import com.esotericsoftware.kryo.Kryo; +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer; +import de.javakaffee.kryoserializers.guava.ReverseListSerializer; +import org.apache.spark.serializer.KryoRegistrator; + + +/** + * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs. + */ +public class BeamSparkRunnerRegistrator implements KryoRegistrator { + + @Override + public void registerClasses(Kryo kryo) { +UnmodifiableCollectionsSerializer.registerSerializers(kryo); +// Guava +ImmutableListSerializer.registerSerializers(kryo); +ImmutableSetSerializer.registerSerializers(kryo); +ImmutableMapSerializer.registerSerializers(kryo); +ImmutableMultimapSerializer.registerSerializers(kryo); +ReverseListSerializer.registerSerializers(kryo); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 4877f6e..ee2104a 100644 ---
[2/2] incubator-beam git commit: This closes #1162
This closes #1162 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53fe3ee4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53fe3ee4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53fe3ee4 Branch: refs/heads/master Commit: 53fe3ee425163a76b69d0830449d222d925eb9cd Parents: f2fe1ae a54ded3 Author: SelaAuthored: Wed Oct 26 10:01:51 2016 +0300 Committer: Sela Committed: Wed Oct 26 10:01:51 2016 +0300 -- .../apache/beam/runners/spark/SparkRunner.java | 19 -- .../translation/GroupCombineFunctions.java | 66 +--- .../spark/translation/TransformTranslator.java | 43 +++-- .../streaming/StreamingTransformTranslator.java | 65 +-- .../spark/util/SparkSideInputReader.java| 2 +- 5 files changed, 55 insertions(+), 140 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-799] Support GroupByKey directly.
Repository: incubator-beam Updated Branches: refs/heads/master f2fe1ae46 -> 53fe3ee42 [BEAM-799] Support GroupByKey directly. Remove runner override for GroupByKey. Avoid NPE if no sideInputs are available in reader. Handle CombineFn with or without context. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a54ded37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a54ded37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a54ded37 Branch: refs/heads/master Commit: a54ded373fa7f6508fb46eea1a1d6f9bc405114b Parents: f2fe1ae Author: SelaAuthored: Sat Oct 22 14:51:50 2016 +0300 Committer: Sela Committed: Wed Oct 26 10:00:45 2016 +0300 -- .../apache/beam/runners/spark/SparkRunner.java | 19 -- .../translation/GroupCombineFunctions.java | 66 +--- .../spark/translation/TransformTranslator.java | 43 +++-- .../streaming/StreamingTransformTranslator.java | 65 +-- .../spark/util/SparkSideInputReader.java| 2 +- 5 files changed, 55 insertions(+), 140 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index b17c38c..45c7f55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; @@ -36,7 +35,6 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PBegin; @@ -115,23 +113,6 @@ public final class SparkRunner extends PipelineRunner { } /** - * Overrides for this runner. - */ - @SuppressWarnings("rawtypes") - @Override - public OutputT apply( - PTransform transform, InputT input) { - -if (transform instanceof GroupByKey) { - return (OutputT) ((PCollection) input).apply( - new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); -} else { - return super.apply(transform, input); -} - } - - - /** * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single * thread. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index de02b26..e2a0f87 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -20,11 +20,9 @@ package org.apache.beam.runners.spark.translation; import com.google.common.collect.Lists; - import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -38,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import
[GitHub] incubator-beam pull request #1171: [BEAM-809] Create a KryoRegistrator for t...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1171 [BEAM-809] Create a KryoRegistrator for the SparkRunner. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-809 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1171.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1171 commit 4e00575a4a8e25ed3618f2f7672d3c24c662a149 Author: Sela <ans...@paypal.com> Date: 2016-10-24T19:35:39Z [BEAM-809] Create a KryoRegistrator for the SparkRunner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1170: [BEAM-808] Increase "spark.port.maxRetrie...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1170 [BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- ⦠ROS. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-808 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1170.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1170 commit 8f930b2fb97a377c6c168a33ca415a3eec7d41f4 Author: Sela <ans...@paypal.com> Date: 2016-10-24T18:52:03Z [BEAM-808] Increase "spark.port.maxRetries" to avoid BindException in ROS. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1162: [BEAM-799] Support GroupByKey directly
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1162 [BEAM-799] Support GroupByKey directly Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-799 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1162.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1162 commit 52ed2eb3fee7e3af166f015851845fc4ff9f57e9 Author: Sela <ans...@paypal.com> Date: 2016-10-22T11:51:50Z Remove runner override for GroupByKey. [BEAM-799] Support GroupByKey directly. commit 93eaf607c26585e5157eb17812025df41bbd5072 Author: Sela <ans...@paypal.com> Date: 2016-10-22T12:44:32Z Avoid NPE if no sideInputs are available in reader. commit 62c7f0e30a0246cc835f46e75046c72be14a1ba1 Author: Sela <ans...@paypal.com> Date: 2016-10-22T12:45:40Z Handle CombineFn with or without context. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1161: [BEAM-769] Spark streaming tests fail on ...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1161 [BEAM-769] Spark streaming tests fail on "nothing processed" if runti⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦me env. is slow because timeout is hit before processing is done. Make graceful stop the default. Keep "pumping-in" the last batch in a mocked stream to handle overflowing batches in case of a graceful stop. Change tests accordingly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-769 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1161.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1161 commit 43c9e57ad8c7af022ba2f46ce4b8d20731f0766e Author: Sela <ans...@paypal.com> Date: 2016-10-20T22:20:33Z [BEAM-769] Spark streaming tests fail on "nothing processed" if runtime env. is slow because timeout is hit before processing is done. Make graceful stop the default. Keep "pumping-in" the last batch in a mocked stream to handle overflowing batches in case of a graceful stop. Change tests accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1153
This closes #1153 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9a41eb9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9a41eb9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9a41eb9 Branch: refs/heads/master Commit: a9a41eb94bee1200c93a735bbe54e80a5d776e3e Parents: 4c90582 a7cc820 Author: SelaAuthored: Sat Oct 22 12:24:21 2016 +0300 Committer: Sela Committed: Sat Oct 22 12:24:21 2016 +0300 -- .../apache/beam/runners/spark/SparkRunner.java | 33 +++- 1 file changed, 32 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: [BEAM-794] Differ combining in case of merging windows with sideInputs.
Repository: incubator-beam Updated Branches: refs/heads/master 4c9058236 -> a9a41eb94 [BEAM-794] Differ combining in case of merging windows with sideInputs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7cc8206 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7cc8206 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7cc8206 Branch: refs/heads/master Commit: a7cc8206cbbc6ac10e71a0563da2fea4c708277b Parents: 4c90582 Author: SelaAuthored: Fri Oct 21 16:00:57 2016 +0300 Committer: Sela Committed: Sat Oct 22 12:23:33 2016 +0300 -- .../apache/beam/runners/spark/SparkRunner.java | 33 +++- 1 file changed, 32 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7cc8206/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index cad53be..b17c38c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark; import java.util.Collection; +import java.util.List; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; @@ -34,11 +35,13 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; @@ -206,7 +209,7 @@ public final class SparkRunner extends PipelineRunner { @SuppressWarnings("unchecked") Class transformClass = (Class ) node.getTransform().getClass(); -if (translator.hasTranslation(transformClass)) { +if (translator.hasTranslation(transformClass) && !shouldDefer(node)) { LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); LOG.debug("Composite transform class: '{}'", transformClass); doVisitTransform(node); @@ -216,6 +219,34 @@ public final class SparkRunner extends PipelineRunner { return CompositeBehavior.ENTER_TRANSFORM; } +private boolean shouldDefer(TransformTreeNode node) { + PInput input = node.getInput(); + // if the input is not a PCollection, or it is but with non merging windows, don't defer. + if (!(input instanceof PCollection) + || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) { +return false; + } + // so far we know that the input is a PCollection with merging windows. + // check for sideInput in case of a Combine transform. + PTransform transform = node.getTransform(); + boolean hasSideInput = false; + if (transform instanceof Combine.PerKey) { +List sideInputs = ((Combine.PerKey) transform).getSideInputs(); +hasSideInput = sideInputs != null && !sideInputs.isEmpty(); + } else if (transform instanceof Combine.Globally) { +List sideInputs = ((Combine.Globally) transform).getSideInputs(); +hasSideInput = sideInputs != null && !sideInputs.isEmpty(); + } + // defer if sideInputs are defined. + if (hasSideInput) { +LOG.info("Deferring combine transformation {} for job {}", transform, +ctxt.getPipeline().getOptions().getJobName()); +return true; + } + // default. + return false; +} + @Override public void visitPrimitiveTransform(TransformTreeNode node) { doVisitTransform(node);
[GitHub] incubator-beam pull request #1153: [BEAM-794] Differ combining in case of me...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1153 [BEAM-794] Differ combining in case of merging windows with sideInputs. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-794 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1153 commit ee5bcecb3c6ce51a2531d2c6ff36a6a99038d32f Author: Sela <ans...@paypal.com> Date: 2016-10-21T13:00:57Z [BEAM-794] Differ combining in case of merging windows with sideInputs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1144: [BEAM-781] Remove Spark's batch unit test...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1144 [BEAM-781] Remove Spark's batch unit tests and rely on ROS tests inst⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦ead. The runner does not support Spark before 1.6 anymore. Remove tests that can be trusted to ROS. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-781 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1144.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1144 commit b3c9502636d48a1e7cf6f2078c5130d7921f Author: Sela <ans...@paypal.com> Date: 2016-10-20T13:34:06Z [BEAM-781] Remove Spark's batch unit tests and rely on ROS tests instead. The runner does not support Spark before 1.6 anymore. Remove tests that can be trusted to ROS. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1143: [BEAM-658] Support Read.Unbounded primiti...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1143 [BEAM-658] Support Read.Unbounded primitive. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Changed mapSourceFunction to use scala's native Option. Upgrade to kryo-serializers 0.39 that provides support for ReverseList (used by Top). Better logging. Allow a longer read time-frame for read in tests. Assert initial parallelism is gerater than zero. Add OnBatchCompleted listener that writes to Kafka. Test latest. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-658 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1143.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1143 commit 71ffb6b6aca9c997326d3b15951d1f295aaf7505 Author: Sela <ans...@paypal.com> Date: 2016-09-29T13:30:32Z [BEAM-658] Support Read.Unbounded primitive. Changed mapSourceFunction to use scala's native Option. Upgrade to kryo-serializers 0.39 that provides support for ReverseList (used by Top). Better logging. Allow a longer read time-frame for read in tests. Assert initial parallelism is gerater than zero. Add OnBatchCompleted listener that writes to Kafka. Test latest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] incubator-beam git commit: [BEAM-259] Execute selected RunnableOnService tests with Spark runner.
[BEAM-259] Execute selected RunnableOnService tests with Spark runner. Handle empty Flatten for bounded. Spark will bubble out a SparkException for user code failure, so this won't catch. Asserting on the error message should be good enough. outputWithTimestamp should handle start/finishBundle as well. Explode WindowedValues before processing. sideOutputWithTimestamp to address start/finishBundle. SideInput with windows. Unused for now, remove. Take sideInput window startegy into account, for combine as well. reduce code duplication. Spark combine support. reuse code where possible. Expose sideInputs and insertDefault in Combine.Globally for direct translation. Direct translation of Combine.Globally into Spark's aggregate function. Make default run with 4 cores by default - makes tests run with multiple threads, but not too many. SideInputReader for the Spark runner. A common abstraction for Keyed and Global implementation. Implement Combine.Globally via Spark's aggregate. runnable-on-service profile doesn't need pluginManagement. Removing test as it does not follow a deterministic combine implementation. Context reuse is mostly for testing. To avoid a test failure that will stop the context and fail all following tests we need to recreate the context if it's stopped as well. WindowFn is used, no need to pass the entire WindowStrategy. Explode elements for processing only when necessary. The SparkRunner should use Beam's UserCodeException instead of it's own custom SparkProcessException. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7eecd7ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7eecd7ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7eecd7ee Branch: refs/heads/master Commit: 7eecd7ee73cdd2b41e785b4540852530deead429 Parents: b0cb2e8 Author: SelaAuthored: Fri Sep 23 13:32:28 2016 +0300 Committer: Sela Committed: Thu Oct 20 00:51:48 2016 +0300 -- runners/spark/pom.xml | 73 ++--- .../runners/spark/SparkPipelineOptions.java | 2 +- .../apache/beam/runners/spark/SparkRunner.java | 15 +- .../beam/runners/spark/TestSparkRunner.java | 4 +- .../runners/spark/translation/DoFnFunction.java | 74 ++--- .../translation/GroupCombineFunctions.java | 235 +--- .../spark/translation/MultiDoFnFunction.java| 85 +++--- .../translation/SparkAbstractCombineFn.java | 134 + .../spark/translation/SparkContextFactory.java | 3 +- .../spark/translation/SparkGlobalCombineFn.java | 260 ++ .../spark/translation/SparkKeyedCombineFn.java | 273 +++ .../spark/translation/SparkProcessContext.java | 160 +-- .../spark/translation/TransformTranslator.java | 143 +- .../spark/translation/TranslationUtils.java | 28 +- .../streaming/StreamingTransformTranslator.java | 126 + .../runners/spark/util/BroadcastHelper.java | 26 -- .../spark/util/SparkSideInputReader.java| 95 +++ .../spark/translation/CombineGloballyTest.java | 101 --- .../translation/SparkPipelineOptionsTest.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 14 + 20 files changed, 1318 insertions(+), 535 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 60b2de9..a246c19 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -59,40 +59,40 @@ local-runnable-on-service-tests false - - - - org.apache.maven.plugins - maven-surefire-plugin - - - runnable-on-service-tests - - org.apache.beam.sdk.testing.RunnableOnService -none -true - - org.apache.beam:java-sdk-all - - - org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest - - - -[ - "--runner=TestSparkRunner", - "--streaming=false" -] - - true - false - - - - - - - + + +org.apache.maven.plugins +maven-surefire-plugin + +
[1/3] incubator-beam git commit: [BEAM-259] Execute selected RunnableOnService tests with Spark runner.
Repository: incubator-beam Updated Branches: refs/heads/master b0cb2e87b -> c472e1227 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index fbaf5b8..2135170 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -18,26 +18,37 @@ package org.apache.beam.runners.spark.translation; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; @@ -45,33 +56,62 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + + /** * Spark runner process context. */ public abstract class SparkProcessContextextends OldDoFn .ProcessContext { - private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class); private final OldDoFn fn; private final SparkRuntimeContext mRuntimeContext; - private final Map mSideInputs; + private final SideInputReader sideInputReader; + private final WindowFn
[3/3] incubator-beam git commit: This closes #1055
This closes #1055 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c472e122 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c472e122 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c472e122 Branch: refs/heads/master Commit: c472e1227d4cc265152afd9afc072ddecc934dc4 Parents: b0cb2e8 7eecd7e Author: SelaAuthored: Thu Oct 20 01:24:34 2016 +0300 Committer: Sela Committed: Thu Oct 20 01:24:34 2016 +0300 -- runners/spark/pom.xml | 73 ++--- .../runners/spark/SparkPipelineOptions.java | 2 +- .../apache/beam/runners/spark/SparkRunner.java | 15 +- .../beam/runners/spark/TestSparkRunner.java | 4 +- .../runners/spark/translation/DoFnFunction.java | 74 ++--- .../translation/GroupCombineFunctions.java | 235 +--- .../spark/translation/MultiDoFnFunction.java| 85 +++--- .../translation/SparkAbstractCombineFn.java | 134 + .../spark/translation/SparkContextFactory.java | 3 +- .../spark/translation/SparkGlobalCombineFn.java | 260 ++ .../spark/translation/SparkKeyedCombineFn.java | 273 +++ .../spark/translation/SparkProcessContext.java | 160 +-- .../spark/translation/TransformTranslator.java | 143 +- .../spark/translation/TranslationUtils.java | 28 +- .../streaming/StreamingTransformTranslator.java | 126 + .../runners/spark/util/BroadcastHelper.java | 26 -- .../spark/util/SparkSideInputReader.java| 95 +++ .../spark/translation/CombineGloballyTest.java | 101 --- .../translation/SparkPipelineOptionsTest.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 14 + 20 files changed, 1318 insertions(+), 535 deletions(-) --
[2/2] incubator-beam git commit: This closes #1125
This closes #1125 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0cb2e87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0cb2e87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0cb2e87 Branch: refs/heads/master Commit: b0cb2e87b14182c9950974204a345a17181ff55c Parents: ea04e61 84c6649 Author: SelaAuthored: Wed Oct 19 21:35:20 2016 +0300 Committer: Sela Committed: Wed Oct 19 21:35:20 2016 +0300 -- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java| 14 +- 1 file changed, 5 insertions(+), 9 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-744] UnboundedKafkaReader should return as soon as it can.
Repository: incubator-beam Updated Branches: refs/heads/master ea04e618e -> b0cb2e87b [BEAM-744] UnboundedKafkaReader should return as soon as it can. Use timeout directly in nextBatch() Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84c6649c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84c6649c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84c6649c Branch: refs/heads/master Commit: 84c6649cd63c33ca79ad43e8973dbf765e27a5d0 Parents: ea04e61 Author: SelaAuthored: Tue Oct 18 22:03:25 2016 +0300 Committer: Sela Committed: Wed Oct 19 21:34:07 2016 +0300 -- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java| 14 +- 1 file changed, 5 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84c6649c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java -- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 2030789..834104e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -756,9 +756,6 @@ public class KafkaIO { private Iterator curBatch = Collections.emptyIterator(); private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); -// how long to wait for new records from kafka consumer inside start() -private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5); -// how long to wait for new records from kafka consumer inside advance() private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including @@ -888,12 +885,13 @@ public class KafkaIO { LOG.info("{}: Returning from consumer pool loop", this); } -private void nextBatch(Duration timeout) { +private void nextBatch() { curBatch = Collections.emptyIterator(); ConsumerRecords records; try { -records = availableRecordsQueue.poll(timeout.getMillis(), +// poll available records, wait (if necessary) up to the specified timeout. +records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -966,9 +964,7 @@ public class KafkaIO { } }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); - // Wait for longer than normal when fetching a batch to improve chances a record is available - // when start() returns. - nextBatch(START_NEW_RECORDS_POLL_TIMEOUT); + nextBatch(); return advance(); } @@ -1032,7 +1028,7 @@ public class KafkaIO { return true; } else { // -- (b) - nextBatch(NEW_RECORDS_POLL_TIMEOUT); + nextBatch(); if (!curBatch.hasNext()) { return false;
[2/2] incubator-beam git commit: This closes #1133
This closes #1133 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ea04e618 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ea04e618 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ea04e618 Branch: refs/heads/master Commit: ea04e618eae8e20e21a1db5b8367cf972446d9b6 Parents: dde8e35 2c8ade8 Author: SelaAuthored: Wed Oct 19 10:08:05 2016 +0300 Committer: Sela Committed: Wed Oct 19 10:08:05 2016 +0300 -- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 26 1 file changed, 10 insertions(+), 16 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-777] KafkaIO Test should handle reader.start() better.
Repository: incubator-beam Updated Branches: refs/heads/master dde8e35ca -> ea04e618e [BEAM-777] KafkaIO Test should handle reader.start() better. KafkaIOTest : start() can return false Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8ade83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8ade83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8ade83 Branch: refs/heads/master Commit: 2c8ade83b2104ecd7f8098b18dd45a0fd8b6cc9f Parents: dde8e35 Author: Raghu AngadiAuthored: Tue Oct 18 21:46:18 2016 -0700 Committer: Sela Committed: Wed Oct 19 10:06:45 2016 +0300 -- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 26 1 file changed, 10 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8ade83/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java -- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 67aa675..2f3c524 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -389,7 +389,10 @@ public class KafkaIOTest { // Kafka records are read in a separate thread inside the reader. As a result advance() might not // read any records even from the mock consumer, especially for the first record. // This is a helper method to loop until we read a record. - private static void advanceOnce(UnboundedReader reader) throws IOException { + private static void advanceOnce(UnboundedReader reader, boolean isStarted) throws IOException { +if (!isStarted && reader.start()) { + return; +} while (!reader.advance()) { // very rarely will there be more than one attempts. // In case of a bug we might end up looping forever, and test will fail with a timeout. @@ -418,9 +421,8 @@ public class KafkaIOTest { final int numToSkip = 20; // one from each partition. // advance numToSkip elements -reader.start(); -for (int l = 1; l < numToSkip; ++l) { - advanceOnce(reader); +for (int i = 0; i < numToSkip; ++i) { + advanceOnce(reader, i > 0); } // Confirm that we get the expected element in sequence before checkpointing. @@ -435,13 +437,10 @@ public class KafkaIOTest { // Confirm that we get the next elements in sequence. // This also confirms that Reader interleaves records from each partitions by the reader. -reader.start(); for (int i = numToSkip; i < numElements; i++) { + advanceOnce(reader, i > numToSkip); assertEquals(i, (long) reader.getCurrent().getKV().getValue()); assertEquals(i, reader.getCurrentTimestamp().getMillis()); - if ((i + 1) < numElements) { -advanceOnce(reader); - } } } @@ -460,9 +459,8 @@ public class KafkaIOTest { UnboundedReader > reader = source.createReader(null, null); -reader.start(); -for (int l = 1; l < initialNumElements; ++l) { - advanceOnce(reader); +for (int l = 0; l < initialNumElements; ++l) { + advanceOnce(reader, l > 0); } // Checkpoint and restart, and confirm that the source continues correctly. @@ -490,19 +488,15 @@ public class KafkaIOTest { reader = source.createReader(null, mark); -reader.start(); - // Verify in any order. As the partitions are unevenly read, the returned records are not in a // simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder. List expected = new ArrayList<>(); List actual = new ArrayList<>(); for (long i = initialNumElements; i < numElements; i++) { + advanceOnce(reader, i > initialNumElements); expected.add(i); actual.add(reader.getCurrent().getKV().getValue()); - if ((i + 1) < numElements) { -advanceOnce(reader); - } } assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray())); }
[GitHub] incubator-beam pull request #1125: [BEAM-744] A runner should be able to ove...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1125 [BEAM-744] A runner should be able to override KafkaIO max wait prope⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦rties. Add KafkaOptions for the UnboundedKafkaReader. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-744 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1125.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1125 commit 627f50cc510783117b0642d4f699d4b4d9b342c7 Author: Sela <ans...@paypal.com> Date: 2016-10-18T11:36:04Z [BEAM-744] A runner should be able to override KafkaIO max wait properties. Add KafkaOptions for the UnboundedKafkaReader. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1073
This closes #1073 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49f94443 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49f94443 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49f94443 Branch: refs/heads/master Commit: 49f94443004a48c0c1524f3c431b73b0f94d53a2 Parents: d790dfe 44225cf Author: SelaAuthored: Fri Oct 14 16:54:54 2016 +0300 Committer: Sela Committed: Fri Oct 14 16:54:54 2016 +0300 -- .../beam/runners/spark/EvaluationResult.java| 4 +- .../spark/translation/EvaluationContext.java| 3 +- .../streaming/StreamingEvaluationContext.java | 6 +- .../apache/beam/runners/spark/DeDupTest.java| 3 +- .../beam/runners/spark/EmptyInputTest.java | 1 - .../beam/runners/spark/SimpleWordCountTest.java | 6 +- .../apache/beam/runners/spark/TfIdfTest.java| 3 +- .../beam/runners/spark/io/AvroPipelineTest.java | 4 +- .../beam/runners/spark/io/NumShardsTest.java| 4 +- .../io/hadoop/HadoopFileFormatPipelineTest.java | 4 +- .../spark/translation/CombineGloballyTest.java | 1 - .../spark/translation/CombinePerKeyTest.java| 1 - .../spark/translation/DoFnOutputTest.java | 4 +- .../translation/MultiOutputWordCountTest.java | 2 - .../spark/translation/SerializationTest.java| 4 +- .../translation/WindowedWordCountTest.java | 10 +- .../streaming/EmptyStreamAssertionTest.java | 76 .../streaming/FlattenStreamingTest.java | 11 +- .../streaming/KafkaStreamingTest.java | 6 +- .../RecoverFromCheckpointStreamingTest.java | 179 -- .../ResumeFromCheckpointStreamingTest.java | 182 +++ .../streaming/SimpleStreamingWordCountTest.java | 6 +- .../streaming/utils/PAssertStreaming.java | 87 ++--- 23 files changed, 346 insertions(+), 261 deletions(-) --
[2/2] incubator-beam git commit: This closes #1072
This closes #1072 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d790dfe1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d790dfe1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d790dfe1 Branch: refs/heads/master Commit: d790dfe1ba7da5387944e47389cd7b35061e2782 Parents: a0f649e 4b49abc Author: SelaAuthored: Fri Oct 14 13:32:14 2016 +0300 Committer: Sela Committed: Fri Oct 14 13:32:14 2016 +0300 -- .../beam/runners/spark/SparkPipelineOptions.java | 18 ++ .../SparkRunnerStreamingContextFactory.java | 8 2 files changed, 26 insertions(+) --
[GitHub] incubator-beam pull request #1073: [BEAM-735] PAssertStreaming should make s...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1073 [BEAM-735] PAssertStreaming should make sure the assertion happened. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-735 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1073.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1073 commit ca3aa78d9651d8d8fc981234c9f4707414dc9738 Author: Sela <ans...@paypal.com> Date: 2016-10-09T18:38:14Z PAssertStreaming shuold check an assertion happened. commit 4adc3827443a6d8490f6b1799fce5c820c9484a5 Author: Sela <ans...@paypal.com> Date: 2016-10-09T18:39:00Z Test assert for skipped assertion.. commit 948fc9d0e894c06e555cf47fc3db48e6aad55008 Author: Sela <ans...@paypal.com> Date: 2016-10-09T18:39:53Z This name is more true to the natureof this test. commit 346e85df4436a07a8dd30d793d7fa0ca4bf23806 Author: Sela <ans...@paypal.com> Date: 2016-10-09T18:40:46Z Fix according to new PAssertStreaming. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1072: [BEAM-734] Add StreamingListeners via Spa...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1072 [BEAM-734] Add StreamingListeners via SparkPipelineOptions. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-734 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1072.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1072 commit 139cdbf0007305901cabaddfb3b387b979719d58 Author: Sela <ans...@paypal.com> Date: 2016-10-09T10:44:58Z Add StreamingListeners via SparkPipelineOptions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1055: [BEAM-259] Enable RunnableOnService for b...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1055 [BEAM-259] Enable RunnableOnService for batch. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Handle empty Flatten for bounded. Spark will bubble out a SparkException for user code failure, so this won't catch. Asserting on the error message should be good enough. outputWithTimestamp should handle start/finishBundle as well. Explode WindowedValues before processing. sideOutputWithTimestamp to address start/finishBundle. SideInput with windows. Unused for now, remove. Take sideInput window startegy into account, for combine as well. reduce code duplication. Spark combine support. reuse code where possible. Expose sideInputs and insertDefault in Combine.Globally for direct translation. Direct translation of Combine.Globally into Spark's aggregate function. Make default run with 4 cores by default - makes tests run with multiple threads, but not too many. SideInputReader for the Spark runner. A common abstraction for Keyed and Global implementation. Implement Combine.Globally via Spark's aggregate. runnable-on-service profile doesn't need pluginManagement. Removing test as it does not follow a deterministic combine implementation. Context reuse is mostly for testing. To avoid a test failure that will stop the context and fail all following tests we need to recreate the context if it's stopped as well. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-259 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1055.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1055 commit 53ed9951203da7f7e2cff0ef4d9ec64f37a0f29d Author: Sela <ans...@paypal.com> Date: 2016-09-23T10:32:28Z Enable RunnableOnService for batch. Handle empty Flatten for bounded. Spark will bubble out a SparkException for user code failure, so this won't catch. Asserting on the error message should be good enough. outputWithTimestamp should handle start/finishBundle as well. Explode WindowedValues before processing. sideOutputWithTimestamp to address start/finishBundle. SideInput with windows. Unused for now, remove. Take sideInput window startegy into account, for combine as well. reduce code duplication. Spark combine support. reuse code where possible. Expose sideInputs and insertDefault in Combine.Globally for direct translation. Direct translation of Combine.Globally into Spark's aggregate function. Make default run with 4 cores by default - makes tests run with multiple threads, but not too many. SideInputReader for the Spark runner. A common abstraction for Keyed and Global implementation. Implement Combine.Globally via Spark's aggregate. runnable-on-service profile doesn't need pluginManagement. Removing test as it does not follow a deterministic combine implementation. Context reuse is mostly for testing. To avoid a test failure that will stop the context and fail all following tests we need to recreate the context if it's stopped as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1040: [BEAM-703] SingletonViewFn might exhaust ...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1040 [BEAM-703] SingletonViewFn might exhaust defaultValue if it's serialized after being used. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-703 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1040.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1040 commit 0bd97efc0d764af17cdd8abdf43bff33bb21be2b Author: Sela <ans...@paypal.com> Date: 2016-10-04T15:12:38Z Avoid losing the encoded defaultValue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-657] Support Read.Bounded primitive.
Repository: incubator-beam Updated Branches: refs/heads/master 4872bde8f -> a00d2f810 [BEAM-657] Support Read.Bounded primitive. Support Read.Bounded primitive. Avro requires this for snappy. Create is supported by Read.Bounded now. Read.Bounded support should solve gs issues now. remove unused direct translations. Addressed by BEAM-668. Assert deault parallelism, close reader on exception, and other improvements. Adressed more comments. Extra line. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5848 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5848 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5848 Branch: refs/heads/master Commit: 58489bc8cbe4d702ff9ae07f932fb96141a1 Parents: 4872bde Author: SelaAuthored: Wed Sep 21 15:13:02 2016 +0300 Committer: Sela Committed: Fri Sep 23 01:31:16 2016 +0300 -- examples/java/pom.xml | 2 +- .../org/apache/beam/examples/WordCountIT.java | 6 - runners/spark/pom.xml | 6 + .../apache/beam/runners/spark/SparkRunner.java | 5 - .../apache/beam/runners/spark/io/SourceRDD.java | 198 +++ .../spark/translation/TransformTranslator.java | 22 ++- 6 files changed, 223 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 6a39f64..9a48ec6 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -184,7 +184,7 @@ [ "--project=apache-beam-testing", - "--tempRoot=/tmp", + "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=org.apache.beam.runners.spark.TestSparkRunner" ] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java index b0e0fe0..2f2ea46 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java @@ -18,7 +18,6 @@ package org.apache.beam.examples; -import com.google.common.io.Resources; import java.util.Date; import org.apache.beam.examples.WordCount.WordCountOptions; import org.apache.beam.sdk.options.Default; @@ -63,11 +62,6 @@ public class WordCountIT { new FileChecksumMatcher(options.getOutputChecksum(), options.getOutput() + "*")); String e2eTestInputPath = "gs://apache-beam-samples/apache/LICENSE"; -// Spark runner currently doesn't support GCS I/O, change default input to: -// .../src/test/resources/LICENSE -if (options.getRunner().getName().contains("SparkRunner")) { - e2eTestInputPath = Resources.getResource("LICENSE").getPath(); -} options.setInputFile(e2eTestInputPath); WordCount.main(TestPipeline.convertToArgs(options)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 228a90b..60b2de9 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -180,6 +180,12 @@ joda-time + org.apache.commons + commons-compress + 1.9 + provided + + commons-io commons-io 2.4 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5848/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 63dfe0d..3888ec2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import
[2/2] incubator-beam git commit: This closes #983
This closes #983 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a00d2f81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a00d2f81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a00d2f81 Branch: refs/heads/master Commit: a00d2f810992c450c14eab2bb5e3aa3ad3f80f74 Parents: 4872bde 584 Author: SelaAuthored: Fri Sep 23 01:32:26 2016 +0300 Committer: Sela Committed: Fri Sep 23 01:32:26 2016 +0300 -- examples/java/pom.xml | 2 +- .../org/apache/beam/examples/WordCountIT.java | 6 - runners/spark/pom.xml | 6 + .../apache/beam/runners/spark/SparkRunner.java | 5 - .../apache/beam/runners/spark/io/SourceRDD.java | 198 +++ .../spark/translation/TransformTranslator.java | 22 ++- 6 files changed, 223 insertions(+), 16 deletions(-) --
[2/2] incubator-beam git commit: This closes #982
This closes #982 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6082ebcc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6082ebcc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6082ebcc Branch: refs/heads/master Commit: 6082ebccedec076140720aefdb8f35e263847082 Parents: 8432752 b1474a1 Author: SelaAuthored: Thu Sep 22 18:19:43 2016 +0300 Committer: Sela Committed: Thu Sep 22 18:19:43 2016 +0300 -- .../streaming/StreamingTransformTranslator.java | 28 +-- .../streaming/SimpleStreamingWordCountTest.java | 49 +--- 2 files changed, 46 insertions(+), 31 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-613] Revised SimpleStreamingWordCountTest to better test fixed windows.
Repository: incubator-beam Updated Branches: refs/heads/master 843275210 -> 6082ebcce [BEAM-613] Revised SimpleStreamingWordCountTest to better test fixed windows. Revised the test to test multiple batches Set the timeout to 1 ms since it essentially plays no role here. Removed blank lines between imports. Refactored the timeout related stuff to make it more natural from Beam model's perspective. Fix windowing bug. Expected result if for the entire window. Renamed the test's name to better reflect the use case it's testing. Fixed a typo. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1474a18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1474a18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1474a18 Branch: refs/heads/master Commit: b1474a18c4fe3b3aefdb6cd364fce9dfc227b6df Parents: 8432752 Author: Stas LevinAuthored: Mon Sep 5 18:22:59 2016 +0300 Committer: Sela Committed: Thu Sep 22 18:18:19 2016 +0300 -- .../streaming/StreamingTransformTranslator.java | 28 +-- .../streaming/SimpleStreamingWordCountTest.java | 49 +--- 2 files changed, 46 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 64ddc57..9cb377d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -191,27 +191,29 @@ public final class StreamingTransformTranslator { @SuppressWarnings("unchecked") JavaDStream dStream = (JavaDStream ) sec.getStream(transform); +// get the right window durations. +Duration windowDuration; +Duration slideDuration; if (windowFn instanceof FixedWindows) { - Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize() - .getMillis()); - sec.setStream(transform, dStream.window(windowDuration)); + windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize().getMillis()); + slideDuration = windowDuration; } else if (windowFn instanceof SlidingWindows) { - Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize() - .getMillis()); - Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod() - .getMillis()); - sec.setStream(transform, dStream.window(windowDuration, slideDuration)); + SlidingWindows slidingWindows = (SlidingWindows) windowFn; + windowDuration = Durations.milliseconds(slidingWindows.getSize().getMillis()); + slideDuration = Durations.milliseconds(slidingWindows.getPeriod().getMillis()); +} else { + throw new UnsupportedOperationException(String.format("WindowFn %s is not supported.", + windowFn.getClass().getCanonicalName())); } +JavaDStream windowedDStream = +dStream.window(windowDuration, slideDuration); //--- then we apply windowing to the elements -@SuppressWarnings("unchecked") -JavaDStream dStream2 = -(JavaDStream ) sec.getStream(transform); if (TranslationUtils.skipAssignWindows(transform, context)) { - sec.setStream(transform, dStream2); + sec.setStream(transform, windowedDStream); } else { final OldDoFn addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); - JavaDStream outStream = dStream2.transform( + JavaDStream outStream = windowedDStream.transform( new Function , JavaRDD >() { @Override public JavaRDD call(JavaRDD rdd) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
[GitHub] incubator-beam pull request #983: [BEAM-657] Support Read.Bounded primitive
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/983 [BEAM-657] Support Read.Bounded primitive Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-657 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/983.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #983 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 8341c6d..1a0511f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -19,39 +19,32 @@ package org.apache.beam.runners.spark.translation; +import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.Map; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.beam.runners.core.AssignWindowsDoFn; -import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; -import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.hadoop.HadoopIO; import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper; import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat; import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat; import org.apache.beam.runners.spark.util.BroadcastHelper; -import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; @@ -63,36 +56,30 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; + import scala.Tuple2; + /** * Supports translation between a Beam transform, and Spark's operations on RDDs. */ @@ -101,31 +88,6 @@ public final class TransformTranslator { private TransformTranslator() { } - /** - * Getter of the field. - */ - public static class FieldGetter { -private final Mapfields; - -public FieldGetter(Class clazz) { -
[1/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.
Repository: incubator-beam Updated Branches: refs/heads/master 5c23f4954 -> 1ceb12aeb http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java new file mode 100644 index 000..beaae13 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.junit.rules.ExternalResource; + +/** + * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton} + * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s. + */ +class ClearAggregatorsRule extends ExternalResource { + @Override + protected void before() throws Throwable { +AccumulatorSingleton.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index 8b7762f..238d7ba 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -29,6 +29,7 @@ import java.io.File; import java.util.Arrays; import java.util.List; import java.util.Set; + import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.sdk.Pipeline; @@ -53,6 +54,9 @@ public class SimpleWordCountTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); + @Rule + public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule(); + private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index 0d15d12..f85baab 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -67,8 +67,7 @@ public class SideEffectsTest implements Serializable { // TODO: remove the version check (and the setup and teardown methods) when we no // longer support Spark 1.3 or 1.4 - String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), - options.getAppName()).version(); + String version = SparkContextFactory.getSparkContext(options).version(); if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) { assertTrue(e.getCause() instanceof UserException); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index a6fe755..8210b0d 100644 ---
[4/4] incubator-beam git commit: This closes #909
This closes #909 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ceb12ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ceb12ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ceb12ae Branch: refs/heads/master Commit: 1ceb12aebd0ffa63bd28d31cbe830230713705ec Parents: 5c23f49 0feb649 Author: SelaAuthored: Wed Sep 21 20:17:38 2016 +0300 Committer: Sela Committed: Wed Sep 21 20:17:38 2016 +0300 -- .../runners/spark/SparkPipelineOptions.java | 28 +- .../apache/beam/runners/spark/SparkRunner.java | 121 ++-- .../spark/aggregators/AccumulatorSingleton.java | 53 ++ .../runners/spark/translation/DoFnFunction.java | 35 +- .../spark/translation/EvaluationContext.java| 17 +- .../translation/GroupCombineFunctions.java | 262 + .../spark/translation/MultiDoFnFunction.java| 44 +- .../spark/translation/SparkContextFactory.java | 48 +- .../translation/SparkPipelineEvaluator.java | 57 -- .../translation/SparkPipelineTranslator.java| 5 +- .../spark/translation/SparkProcessContext.java | 10 +- .../spark/translation/SparkRuntimeContext.java | 44 +- .../spark/translation/TransformTranslator.java | 473 +++- .../spark/translation/TranslationUtils.java | 195 +++ .../SparkRunnerStreamingContextFactory.java | 98 .../streaming/StreamingEvaluationContext.java | 44 +- .../streaming/StreamingTransformTranslator.java | 549 --- .../runners/spark/util/BroadcastHelper.java | 26 + .../runners/spark/ClearAggregatorsRule.java | 33 ++ .../beam/runners/spark/SimpleWordCountTest.java | 4 + .../spark/translation/SideEffectsTest.java | 3 +- .../streaming/FlattenStreamingTest.java | 54 +- .../streaming/KafkaStreamingTest.java | 26 +- .../RecoverFromCheckpointStreamingTest.java | 179 ++ .../streaming/SimpleStreamingWordCountTest.java | 25 +- .../utils/TestOptionsForStreaming.java | 55 ++ .../org/apache/beam/sdk/transforms/Combine.java | 7 + 27 files changed, 1682 insertions(+), 813 deletions(-) --
[3/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.
[BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming. Refactor translation mechanism to support checkpointing of DStream. Support basic functionality with GroupByKey and ParDo. Added support for grouping operations. Added checkpointDir option, using it before execution. Support Accumulator recovery from checkpoint. Streaming tests should use JUnit's TemporaryFolder Rule for checkpoint directory. Support combine optimizations. Support durable sideInput via Broadcast. Branches in the pipeline are either Bounded or Unbounded and should be handles so. Handle flatten/union of Bouned/Unbounded RDD/DStream. JavaDoc Rebased on master. Reuse functionality between batch and streaming translators Better implementation of streaming/batch pipeline-branch translation. Move group/combine functions to their own wrapping class. Fixed missing licenses. Use VisibleForTesting annotation instead of comment. Remove Broadcast failure recovery, to be handled separately. Stop streaming gracefully, so any checkpointing will finish first. typo + better documentation. Validate checkpointDir durability. Add checkpoint duration option. A more compact streaming tests init with Rules. A more accurate test, removed broadcast from test as it will be handeled separately. Bounded/Unbounded translation to be handled by the SparkPipelineTranslator implementation. Evaluator decides if translateBounded or translateUnbounded according to the visited node's boundeness. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0feb6499 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0feb6499 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0feb6499 Branch: refs/heads/master Commit: 0feb64994a05de4fe6b1ba178a38d03743b89b7a Parents: 5c23f49 Author: SelaAuthored: Thu Aug 25 23:49:01 2016 +0300 Committer: Sela Committed: Wed Sep 21 20:15:27 2016 +0300 -- .../runners/spark/SparkPipelineOptions.java | 28 +- .../apache/beam/runners/spark/SparkRunner.java | 121 ++-- .../spark/aggregators/AccumulatorSingleton.java | 53 ++ .../runners/spark/translation/DoFnFunction.java | 35 +- .../spark/translation/EvaluationContext.java| 17 +- .../translation/GroupCombineFunctions.java | 262 + .../spark/translation/MultiDoFnFunction.java| 44 +- .../spark/translation/SparkContextFactory.java | 48 +- .../translation/SparkPipelineEvaluator.java | 57 -- .../translation/SparkPipelineTranslator.java| 5 +- .../spark/translation/SparkProcessContext.java | 10 +- .../spark/translation/SparkRuntimeContext.java | 44 +- .../spark/translation/TransformTranslator.java | 473 +++- .../spark/translation/TranslationUtils.java | 195 +++ .../SparkRunnerStreamingContextFactory.java | 98 .../streaming/StreamingEvaluationContext.java | 44 +- .../streaming/StreamingTransformTranslator.java | 549 --- .../runners/spark/util/BroadcastHelper.java | 26 + .../runners/spark/ClearAggregatorsRule.java | 33 ++ .../beam/runners/spark/SimpleWordCountTest.java | 4 + .../spark/translation/SideEffectsTest.java | 3 +- .../streaming/FlattenStreamingTest.java | 54 +- .../streaming/KafkaStreamingTest.java | 26 +- .../RecoverFromCheckpointStreamingTest.java | 179 ++ .../streaming/SimpleStreamingWordCountTest.java | 25 +- .../utils/TestOptionsForStreaming.java | 55 ++ .../org/apache/beam/sdk/transforms/Combine.java | 7 + 27 files changed, 1682 insertions(+), 813 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index db6b75c..7afb68c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -19,9 +19,9 @@ package org.apache.beam.runners.spark; import com.fasterxml.jackson.annotation.JsonIgnore; - import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; @@ -48,6 +48,32 @@ public interface SparkPipelineOptions extends
[2/2] incubator-beam git commit: [BEAM-628] Fixed the Graphite metrics sink configuration for spark-submit on yarn.
[BEAM-628] Fixed the Graphite metrics sink configuration for spark-submit on yarn. This closes #945 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/643cf63d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/643cf63d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/643cf63d Branch: refs/heads/master Commit: 643cf63d500b8ebe3295bad6d39030ed3a73ff12 Parents: f81b9a0 8451b31 Author: SelaAuthored: Mon Sep 12 13:22:57 2016 +0300 Committer: Sela Committed: Mon Sep 12 13:22:57 2016 +0300 -- .../spark/src/test/resources/metrics.properties | 61 1 file changed, 50 insertions(+), 11 deletions(-) --
[1/2] incubator-beam git commit: Fixed the Graphite metrics sink configuration so it actually works when submitting using spark-submit on yarn.
Repository: incubator-beam Updated Branches: refs/heads/master f81b9a041 -> 643cf63d5 Fixed the Graphite metrics sink configuration so it actually works when submitting using spark-submit on yarn. Made the metrics configuration info a bit clearer. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8451b312 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8451b312 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8451b312 Branch: refs/heads/master Commit: 8451b31200ff47d99f40217afc743ff85ee10351 Parents: f81b9a0 Author: Stas LevinAuthored: Mon Sep 12 11:27:22 2016 +0300 Committer: Sela Committed: Mon Sep 12 13:18:22 2016 +0300 -- .../spark/src/test/resources/metrics.properties | 61 1 file changed, 50 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8451b312/runners/spark/src/test/resources/metrics.properties -- diff --git a/runners/spark/src/test/resources/metrics.properties b/runners/spark/src/test/resources/metrics.properties index 4aa01d2..143532d 100644 --- a/runners/spark/src/test/resources/metrics.properties +++ b/runners/spark/src/test/resources/metrics.properties @@ -14,16 +14,55 @@ # See the License for the specific language governing permissions and # limitations under the License. + +# The "org.apache.beam.runners.spark.aggregators.metrics.sink.XSink" +# (a.k.a Beam.XSink) is only configured for the driver, the executors are set with a Spark native +# implementation "org.apache.spark.metrics.sink.XSink" (a.k.a Spark.XSink). +# This is due to sink class loading behavior, which is different on the driver and executors nodes. +# Since Beam aggregator metrics are reported via Spark accumulators and thus make their way to the +# driver, we only need the "Beam.XSink" on the driver side. Executor nodes can keep +# reporting Spark native metrics using the traditional Spark.XSink. +# +# The the current sink configuration pattern is therefore: +# +# driver.**.class = Beam.XSink +# executor.**.class = Spark.XSink + + +# * A metrics sink for tests * *.sink.memory.class=org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics +# * End of InMemoryMetrics sink configuration section * + + +# * A sample configuration for outputting metrics to Graphite * + +#driver.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink +#driver.sink.graphite.host=YOUR_HOST +#driver.sink.graphite.port=2003 +#driver.sink.graphite.prefix=spark +#driver.sink.graphite.period=1 +#driver.sink.graphite.unit=SECONDS + +#executor.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink +#executor.sink.graphite.host=YOUR_HOST +#executor.sink.graphite.port=2003 +#executor.sink.graphite.prefix=spark +#executor.sink.graphite.period=1 +#executor.sink.graphite.unit=SECONDS + +# * End of Graphite sik configuration section * + + +# * A sample configuration for outputting metrics to a CSV file. * + +#driver.sink.csv.class=org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink +#driver.sink.csv.directory=/tmp/spark-metrics +#driver.sink.csv.period=1 +#driver.sink.graphite.unit=SECONDS + +#executor.sink.csv.class=org.apache.spark.metrics.sink.CsvSink +#executor.sink.csv.directory=/tmp/spark-metrics +#executor.sink.csv.period=1 +#executor.sink.graphite.unit=SECONDS -#*.sink.csv.class=org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink -#*.sink.csv.directory=/tmp/spark-metrics -#*.sink.csv.period=1 -#*.sink.graphite.unit=SECONDS - -#*.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink -#*.sink.graphite.host=YOUR_HOST -#*.sink.graphite.port=2003 -#*.sink.graphite.prefix=spark -#*.sink.graphite.period=1 -#*.sink.graphite.unit=SECONDS +# * End of CSV sink configuration section *
[2/2] incubator-beam git commit: [BEAM-627] Set Spark master only if not set.
[BEAM-627] Set Spark master only if not set. This closes #944 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f81b9a04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f81b9a04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f81b9a04 Branch: refs/heads/master Commit: f81b9a04106e7aed484e8016b51c96654e8eb221 Parents: 49208ca c961062 Author: SelaAuthored: Mon Sep 12 11:41:53 2016 +0300 Committer: Sela Committed: Mon Sep 12 11:41:53 2016 +0300 -- .../beam/runners/spark/translation/SparkContextFactory.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Set master if not already set.
Repository: incubator-beam Updated Branches: refs/heads/master 49208cadd -> f81b9a041 Set master if not already set. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9610623 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9610623 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9610623 Branch: refs/heads/master Commit: c9610623e41f2352182bb8a359244df226902079 Parents: 49208ca Author: SelaAuthored: Mon Sep 12 11:00:34 2016 +0300 Committer: Sela Committed: Mon Sep 12 11:00:34 2016 +0300 -- .../beam/runners/spark/translation/SparkContextFactory.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9610623/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index e008448..0e7db9f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -64,7 +64,10 @@ public final class SparkContextFactory { private static JavaSparkContext createSparkContext(String master, String appName) { SparkConf conf = new SparkConf(); -conf.setMaster(master); +if (!conf.contains("spark.master")) { + // set master if not set. + conf.setMaster(master); +} conf.setAppName(appName); conf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); return new JavaSparkContext(conf);
[GitHub] incubator-beam pull request #944: [BEAM-627] Set master if not already set.
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/944 [BEAM-627] Set master if not already set. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-627 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #944 commit c9610623e41f2352182bb8a359244df226902079 Author: Sela <ans...@paypal.com> Date: 2016-09-12T08:00:34Z Set master if not already set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Support Verifiers in TestSparkRunner
Repository: incubator-beam Updated Branches: refs/heads/master 82ebfd487 -> 49208cadd Support Verifiers in TestSparkRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f4ef88b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f4ef88b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f4ef88b Branch: refs/heads/master Commit: 0f4ef88b58ea8a7851f592dc4bb42702fdde9c0a Parents: 82ebfd4 Author: Aviem ZurAuthored: Thu Aug 25 17:23:07 2016 +0300 Committer: Sela Committed: Sat Sep 10 11:25:28 2016 +0300 -- runners/spark/pom.xml| 11 +-- .../org/apache/beam/runners/spark/TestSparkRunner.java | 9 - 2 files changed, 17 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f4ef88b/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index b928b44..14bbd73 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -242,12 +242,19 @@ junit junit - test + provided + + + hamcrest-core + org.hamcrest + + + org.hamcrest hamcrest-all - test + provided http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f4ef88b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 376b80f..a1e5918 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -18,10 +18,13 @@ package org.apache.beam.runners.spark; +import static org.hamcrest.MatcherAssert.assertThat; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; @@ -70,6 +73,10 @@ public final class TestSparkRunner extends PipelineRunner { @Override public EvaluationResult run(Pipeline pipeline) { -return delegate.run(pipeline); +TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); +EvaluationResult result = delegate.run(pipeline); +assertThat(result, testPipelineOptions.getOnCreateMatcher()); +assertThat(result, testPipelineOptions.getOnSuccessMatcher()); +return result; } }
[GitHub] incubator-beam pull request #909: [BEAM-610] Enable spark's checkpointing me...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/909 [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Support basic functionality with GroupByKey and ParDo. Added support for grouping operations. Added checkpointDir option, using it before execution. Support Accumulator recovery from checkpoint. Streaming tests should use JUnit's TemporaryFolder Rule for checkpoint directory. Support combine optimizations. Support durable sideInput via Broadcast. Branches in the pipeline are either Bounded or Unbounded and should be handles so. Handle flatten/union of Bouned/Unbounded RDD/DStream. JavaDoc Rebased on master. Reuse functionality between batch and streaming translators Better implementation of streaming/batch pipeline-branch translation. Move group/combine functions to their own wrapping class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam BEAM-610 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/909.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #909 commit ec9cd0805c23afd792dcccf0f8fb268cdbb0e319 Author: Sela <ans...@paypal.com> Date: 2016-08-25T20:49:01Z Refactor translation mechanism to support checkpointing of DStream. Support basic functionality with GroupByKey and ParDo. Added support for grouping operations. Added checkpointDir option, using it before execution. Support Accumulator recovery from checkpoint. Streaming tests should use JUnit's TemporaryFolder Rule for checkpoint directory. Support combine optimizations. Support durable sideInput via Broadcast. Branches in the pipeline are either Bounded or Unbounded and should be handles so. Handle flatten/union of Bouned/Unbounded RDD/DStream. JavaDoc Rebased on master. Reuse functionality between batch and streaming translators Better implementation of streaming/batch pipeline-branch translation. Move group/combine functions to their own wrapping class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: [BEAM-592] Fix SparkRunner Dependency Problem in WordCount This closes #892
[BEAM-592] Fix SparkRunner Dependency Problem in WordCount This closes #892 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/baf5e416 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/baf5e416 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/baf5e416 Branch: refs/heads/master Commit: baf5e416dc67a650e04770f67496ff9f6fb7bc0b Parents: a17a99f ef828de Author: SelaAuthored: Sat Aug 27 12:31:33 2016 +0300 Committer: Sela Committed: Sat Aug 27 12:31:33 2016 +0300 -- examples/java/pom.xml | 6 ++ examples/java8/pom.xml | 6 ++ 2 files changed, 12 insertions(+) --