[1/2] beam git commit: Add README.md to KafkaIO
Repository: beam Updated Branches: refs/heads/master e33cc24a5 -> f816ad879 Add README.md to KafkaIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf1b0a5e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf1b0a5e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf1b0a5e Branch: refs/heads/master Commit: bf1b0a5e9e95abf0521e081ba575c789f46ba499 Parents: e33cc24 Author: Joey Baruch Authored: Tue Aug 29 16:38:24 2017 +0300 Committer: Aviem Zur Committed: Tue Aug 29 18:33:57 2017 +0300 -- sdks/java/io/kafka/README.md | 36 1 file changed, 36 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/bf1b0a5e/sdks/java/io/kafka/README.md -- diff --git a/sdks/java/io/kafka/README.md b/sdks/java/io/kafka/README.md new file mode 100644 index 000..07d00a1 --- /dev/null +++ b/sdks/java/io/kafka/README.md @@ -0,0 +1,36 @@ + + +KafkaIO contains I/O transforms which allow you to read/write messages from/to [Apache Kafka](http://kafka.apache.org/). + +## Dependencies + +To use KafkaIO you must first add a dependency on `beam-sdks-java-io-kafka` + +```maven + +org.apache.beam +beam-sdks-java-io-kafka +... + +``` + +## Documentation + +- [KafkaIO.java](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java)
[2/2] beam git commit: This closes #3780
This closes #3780 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f816ad87 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f816ad87 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f816ad87 Branch: refs/heads/master Commit: f816ad87923610bd3fe507a298ac53ea49b4174a Parents: e33cc24 bf1b0a5 Author: Aviem Zur Authored: Tue Aug 29 18:34:33 2017 +0300 Committer: Aviem Zur Committed: Tue Aug 29 18:34:33 2017 +0300 -- sdks/java/io/kafka/README.md | 36 1 file changed, 36 insertions(+) --
[beam-site] branch asf-site updated (1b92d41 -> 3c2d094)
This is an automated email from the ASF dual-hosted git repository. aviemzur pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam-site.git. from 1b92d41 This closes #279: Expands the section on Coders in Style Guide new 97b1ec9 Update PMC new 6835c8a Regenerate website new 3c2d094 This closes #287 The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: content/contribute/team/index.html | 2 +- src/_beam_team/team.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- To stop receiving notification emails like this one, please contact ['"commits@beam.apache.org" '].
[beam-site] 02/03: Regenerate website
This is an automated email from the ASF dual-hosted git repository. aviemzur pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam-site.git commit 6835c8a4f015f0cebaf55a09730e9f3938273f0f Author: Aviem Zur AuthorDate: Sun Aug 13 15:32:38 2017 +0300 Regenerate website --- content/contribute/team/index.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/content/contribute/team/index.html b/content/contribute/team/index.html index 76ec22e..ea48ea3 100644 --- a/content/contribute/team/index.html +++ b/content/contribute/team/index.html @@ -379,7 +379,7 @@ aviemzur aviemzur [at] apache [dot] org PayPal - committer + committer, PMC +2 -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" .
[beam-site] 03/03: This closes #287
This is an automated email from the ASF dual-hosted git repository. aviemzur pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam-site.git commit 3c2d09447750cee276a1e1f197b53b7ecb40af8c Merge: 1b92d41 6835c8a Author: Aviem Zur AuthorDate: Sun Aug 13 15:32:46 2017 +0300 This closes #287 content/contribute/team/index.html | 2 +- src/_beam_team/team.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" .
[beam-site] 01/03: Update PMC
This is an automated email from the ASF dual-hosted git repository. aviemzur pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam-site.git commit 97b1ec9cbdbb5bd71cde3ec9170f5dd9236e4f78 Author: Aviem Zur AuthorDate: Fri Aug 11 00:51:00 2017 +0300 Update PMC --- src/_beam_team/team.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/_beam_team/team.md b/src/_beam_team/team.md index fd3feb2..8083d22 100644 --- a/src/_beam_team/team.md +++ b/src/_beam_team/team.md @@ -150,6 +150,6 @@ members: apache_id: aviemzur email: aviemzur [at] apache [dot] org organization: PayPal -roles: committer +roles: committer, PMC time_zone: "+2" --- -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" .
[1/2] beam git commit: [BEAM-2314] Add ValidatesRunner test for merging custom windows
Repository: beam Updated Branches: refs/heads/master f54072a1b -> 0064fb37a [BEAM-2314] Add ValidatesRunner test for merging custom windows Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dfa983ce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dfa983ce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dfa983ce Branch: refs/heads/master Commit: dfa983ce4adb85d211497460254b6a95944ce869 Parents: f54072a Author: Etienne Chauchot Authored: Mon May 29 12:05:51 2017 +0200 Committer: Aviem Zur Committed: Mon Jul 24 14:33:00 2017 +0300 -- runners/spark/pom.xml | 3 +- .../sdk/testing/UsesCustomWindowMerging.java| 23 +++ .../sdk/transforms/windowing/WindowTest.java| 184 +++ 3 files changed, 209 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 7f70204..35e933b 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -77,7 +77,8 @@ org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesCommittedMetrics, -org.apache.beam.sdk.testing.UsesTestStream +org.apache.beam.sdk.testing.UsesTestStream, +org.apache.beam.sdk.testing.UsesCustomWindowMerging none 1 http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java new file mode 100644 index 000..fc40e02 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize custom window merging. + */ +public interface UsesCustomWindowMerging {} http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 65af7a1..5b6d046 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -31,19 +31,30 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesCustomWindowMerging; import org.apache.bea
[2/2] beam git commit: This closes #3286
This closes #3286 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0064fb37 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0064fb37 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0064fb37 Branch: refs/heads/master Commit: 0064fb37ad13a10fc510e567d21873403a42340a Parents: f54072a dfa983c Author: Aviem Zur Authored: Mon Jul 24 18:22:42 2017 +0300 Committer: Aviem Zur Committed: Mon Jul 24 18:22:42 2017 +0300 -- runners/spark/pom.xml | 3 +- .../sdk/testing/UsesCustomWindowMerging.java| 23 +++ .../sdk/transforms/windowing/WindowTest.java| 184 +++ 3 files changed, 209 insertions(+), 1 deletion(-) --
[3/3] beam git commit: This closes #3343
This closes #3343 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2ee5955 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2ee5955 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2ee5955 Branch: refs/heads/master Commit: e2ee59557befd729e24e62a4991a76bab64f5755 Parents: b3099bb 22dbb50 Author: Aviem Zur Authored: Thu Jun 22 15:33:26 2017 +0300 Committer: Aviem Zur Committed: Thu Jun 22 15:33:26 2017 +0300 -- runners/spark/pom.xml | 42 +++--- .../apache/beam/runners/spark/SparkRunner.java | 2 +- .../beam/runners/spark/TestSparkRunner.java | 2 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 6 +- .../spark/stateful/SparkTimerInternals.java | 18 ++- .../spark/util/GlobalWatermarkHolder.java | 127 ++- .../spark/GlobalWatermarkHolderTest.java| 18 +-- 7 files changed, 141 insertions(+), 74 deletions(-) --
[2/3] beam git commit: [BEAM-2359] Fix watermark broadcasting to executors in Spark runner
[BEAM-2359] Fix watermark broadcasting to executors in Spark runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20820fa5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20820fa5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20820fa5 Branch: refs/heads/master Commit: 20820fa5477ffcdd4a9ef2e9340353ed3c5691a9 Parents: b3099bb Author: Aviem Zur Authored: Mon Jun 12 17:04:00 2017 +0300 Committer: Aviem Zur Committed: Thu Jun 22 14:51:02 2017 +0300 -- .../apache/beam/runners/spark/SparkRunner.java | 2 +- .../beam/runners/spark/TestSparkRunner.java | 2 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 6 +- .../spark/stateful/SparkTimerInternals.java | 18 ++- .../spark/util/GlobalWatermarkHolder.java | 127 ++- .../spark/GlobalWatermarkHolderTest.java| 18 +-- 6 files changed, 120 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/20820fa5/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index d008718..595521f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -171,7 +171,7 @@ public final class SparkRunner extends PipelineRunner { } // register Watermarks listener to broadcast the advanced WMs. - jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc))); + jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener())); // The reason we call initAccumulators here even though it is called in // SparkRunnerStreamingContextFactory is because the factory is not called when resuming http://git-wip-us.apache.org/repos/asf/beam/blob/20820fa5/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index eccee57..a13a3b1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -169,7 +169,7 @@ public final class TestSparkRunner extends PipelineRunner { result.waitUntilFinish(Duration.millis(batchDurationMillis)); do { SparkTimerInternals sparkTimerInternals = - SparkTimerInternals.global(GlobalWatermarkHolder.get()); + SparkTimerInternals.global(GlobalWatermarkHolder.get(batchDurationMillis)); sparkTimerInternals.advanceWatermark(); globalWatermark = sparkTimerInternals.currentInputWatermarkTime(); // let another batch-interval period of execution, just to reason about WM propagation. http://git-wip-us.apache.org/repos/asf/beam/blob/20820fa5/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index be4f3f6..1385e07 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -104,13 +104,15 @@ public class SparkGroupAlsoByWindowViaWindowSet { public static JavaDStream>>> groupAlsoByWindow( - JavaDStream inputDStream, + final JavaDStream inputDStream, final Coder keyCoder, final Coder> wvCoder, final WindowingStrategy windowingStrategy, final SparkRuntimeContext runtimeContext, final List sourceIds) { +final long batchDurationMillis = + runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis(); final IterableCoder> itrWvCoder = IterableCoder.of(wvCoder); final Coder iCoder = ((FullWindowedValueCoder) wvCoder).getValueCoder(); final Coder wCoder = @@ -239,7 +241,7 @@ public class SparkGroupAlsoByWindowViaWindowSet { SparkStateInternals stateInternals; SparkTi
[1/3] beam git commit: Move Spark runner streaming tests to post commit.
Repository: beam Updated Branches: refs/heads/master b3099bba2 -> e2ee59557 Move Spark runner streaming tests to post commit. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22dbb500 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22dbb500 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22dbb500 Branch: refs/heads/master Commit: 22dbb500289675fe95b6d149c8550e09dc26feac Parents: 20820fa Author: Aviem Zur Authored: Wed Jun 21 17:53:21 2017 +0300 Committer: Aviem Zur Committed: Thu Jun 22 14:51:02 2017 +0300 -- runners/spark/pom.xml | 42 +- 1 file changed, 21 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/22dbb500/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 0f6b730..ee72dd9 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -103,6 +103,27 @@ 4 + +streaming-tests +test + + test + + + +org.apache.beam.runners.spark.StreamingTest + + + + [ + "--runner=TestSparkRunner", + "--forceStreaming=true", + "--enableSparkMetricSinks=true" + ] + + + + @@ -372,27 +393,6 @@ - - streaming-tests - test - -test - - - - org.apache.beam.runners.spark.StreamingTest - - - -[ -"--runner=TestSparkRunner", -"--forceStreaming=true", -"--enableSparkMetricSinks=true" -] - - - -
[GitHub] beam pull request #3343: [BEAM-2359] Fix watermark broadcasting to executors...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/3343 [BEAM-2359] Fix watermark broadcasting to executors in Spark runner Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- R: @amitsela CC: @staslev @kobisalant You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam BEAM-2359-watermark-bug-spark Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3343.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3343 commit f1b679e402734f20dcd9645babaec0a3f291e259 Author: Aviem Zur Date: 2017-06-12T14:04:00Z [BEAM-2359] Fix watermark broadcasting to executors in Spark runner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: This closes #2917
This closes #2917 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43c44232 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43c44232 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43c44232 Branch: refs/heads/master Commit: 43c44232dcb23dc8ff38a05197962dbe1a3a5e64 Parents: 9cdae6c 5e5fbed Author: Aviem Zur Authored: Sat Jun 3 17:42:48 2017 +0300 Committer: Aviem Zur Committed: Sat Jun 3 17:42:48 2017 +0300 -- runners/spark/pom.xml | 2 - .../spark/translation/MultiDoFnFunction.java| 104 +-- .../spark/translation/SparkProcessContext.java | 23 +++- .../spark/translation/TransformTranslator.java | 84 --- .../streaming/StreamingTransformTranslator.java | 3 +- 5 files changed, 189 insertions(+), 27 deletions(-) --
[1/2] beam git commit: [BEAM-2175] [BEAM-1115] Support for new State and Timer API in Spark batch mode
Repository: beam Updated Branches: refs/heads/master 9cdae6caf -> 43c44232d [BEAM-2175] [BEAM-1115] Support for new State and Timer API in Spark batch mode Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e5fbed7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e5fbed7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e5fbed7 Branch: refs/heads/master Commit: 5e5fbed70af5d6ff827266d3db89cd5d8d51f544 Parents: 9cdae6c Author: JingsongLi Authored: Wed May 10 19:49:04 2017 +0800 Committer: Aviem Zur Committed: Sat Jun 3 16:49:59 2017 +0300 -- runners/spark/pom.xml | 2 - .../spark/translation/MultiDoFnFunction.java| 104 +-- .../spark/translation/SparkProcessContext.java | 23 +++- .../spark/translation/TransformTranslator.java | 84 --- .../streaming/StreamingTransformTranslator.java | 3 +- 5 files changed, 189 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/5e5fbed7/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 697f67a..ddb4aca 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -77,8 +77,6 @@ org.apache.beam.runners.spark.UsesCheckpointRecovery -org.apache.beam.sdk.testing.UsesStatefulParDo, -org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream http://git-wip-us.apache.org/repos/asf/beam/blob/5e5fbed7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 3274912..23d5b32 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -22,16 +22,24 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.InMemoryTimerInternals; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; @@ -60,6 +68,7 @@ public class MultiDoFnFunction private final List> additionalOutputTags; private final Map, KV, SideInputBroadcast>> sideInputs; private final WindowingStrategy windowingStrategy; + private final boolean stateful; /** * @param aggAccum The Spark {@link Accumulator} that backs the Beam Aggregators. @@ -70,6 +79,7 @@ public class MultiDoFnFunction * @param additionalOutputTags Additional {@link TupleTag output tags}. * @param sideInputsSide inputs used in this {@link DoFn}. * @param windowingStrategy Input {@link WindowingStrategy}. + * @param stateful Stateful {@link DoFn}. */ public MultiDoFnFunction( Accumulator aggAccum, @@ -80,7 +90,8 @@ public class MultiDoFnFunction TupleTag mainOutputTag, List> additionalOutputTags, Map, KV, SideInputBroadcast>> sideInputs, - WindowingStrategy windowingStrategy) { + WindowingStrategy windowingStrategy, + boolean stateful) { this.aggAccum = aggAccum; this.metricsAccum = metricsAccum; this.stepName = stepName; @@ -90,6 +101,7 @@ public class MultiDoFnFunction this.additionalOutputTags =
[GitHub] beam pull request #3126: [BEAM-2279] Add HDFS support to Spark runner profil...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/3126 [BEAM-2279] Add HDFS support to Spark runner profiles in archetypes and examples Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam add-hdfs-support-to-spark-profiles Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3126.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3126 commit 29138f4987c22fd26bd84142bcc98ccd0f52bc63 Author: Aviem Zur Date: 2017-05-12T19:18:02Z [BEAM-2279] Add HDFS support to Spark runner profiles in archetypes and examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #3115: [BEAM-2277] Fix URI_SCHEME_PATTERN in FileSystems
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/3115 [BEAM-2277] Fix URI_SCHEME_PATTERN in FileSystems Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam fix-uri-scheme-pattern-in-filesystems Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3115.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3115 commit ca05ed560888a2f2a86442b89e23fb7f49e1acfd Author: Aviem Zur Date: 2017-05-12T13:02:54Z [BEAM-2277] Fix URI_SCHEME_PATTERN in FileSystems --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2938: Cherry-pick pull request #2649 into release-2.0.0 b...
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2938 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2938: Cherry-pick pull request #2649 into release-2.0.0 b...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2938 Cherry-pick pull request #2649 into release-2.0.0 branch Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam cherry-pick-2649 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2938.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2938 commit bd5e347f57648ed28b2ae58b263a8c0825320f04 Author: Aviem Zur Date: 2017-05-05T20:13:24Z Cherry-pick pull request #2649 into release-2.0.0 branch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[3/4] beam git commit: [BEAM-1672] Make MetricsContainers accumulable.
[BEAM-1672] Make MetricsContainers accumulable. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/46c2f935 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/46c2f935 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/46c2f935 Branch: refs/heads/master Commit: 46c2f935a99350e18e5d50f1a996996760ebc2e3 Parents: db0ec99 Author: Aviem Zur Authored: Fri May 5 23:13:24 2017 +0300 Committer: Aviem Zur Committed: Sat May 6 08:27:49 2017 +0300 -- .../apache/beam/runners/core/LateDataUtils.java | 2 +- .../apache/beam/sdk/metrics/CounterCell.java| 27 +- .../org/apache/beam/sdk/metrics/DirtyState.java | 3 +- .../beam/sdk/metrics/DistributionCell.java | 16 +- .../org/apache/beam/sdk/metrics/GaugeCell.java | 20 +- .../org/apache/beam/sdk/metrics/MetricCell.java | 14 +- .../org/apache/beam/sdk/metrics/Metrics.java| 2 +- .../beam/sdk/metrics/MetricsContainer.java | 29 +- .../sdk/metrics/MetricsContainerStepMap.java| 487 +++ .../org/apache/beam/sdk/metrics/MetricsMap.java | 5 +- .../beam/sdk/metrics/CounterCellTest.java | 6 +- .../metrics/MetricsContainerStepMapTest.java| 258 ++ .../beam/sdk/metrics/MetricsContainerTest.java | 14 +- 13 files changed, 846 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java index c45387b..f7c0d31 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -71,7 +71,7 @@ public class LateDataUtils { .isBefore(timerInternals.currentInputWatermarkTime()); if (expired) { // The element is too late for this window. - droppedDueToLateness.inc(); + droppedDueToLateness.update(1L); WindowTracing.debug( "GroupAlsoByWindow: Dropping element at {} for key: {}; " + "window: {} since it is too far behind inputWatermark: {}", http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java index 7ab5ebc..4b8548f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * indirection. */ @Experimental(Kind.METRICS) -public class CounterCell implements MetricCell { +public class CounterCell implements MetricCell { private final DirtyState dirty = new DirtyState(); private final AtomicLong value = new AtomicLong(); @@ -41,13 +41,26 @@ public class CounterCell implements MetricCell { */ CounterCell() {} - /** Increment the counter by the given amount. */ - private void add(long n) { + /** + * Increment the counter by the given amount. + * @param n value to increment by. Can be negative to decrement. + */ + public void update(long n) { value.addAndGet(n); dirty.afterModification(); } @Override + public void update(Long n) { +throw new UnsupportedOperationException("CounterCell.update(Long n) should not be used" ++ " as it performs unnecessary boxing/unboxing. Use CounterCell.update(long n) instead."); + } + + @Override public void update(MetricCell other) { +update((long) other.getCumulative()); + } + + @Override public DirtyState getDirty() { return dirty; } @@ -56,12 +69,4 @@ public class CounterCell implements MetricCell { public Long getCumulative() { return value.get(); } - - public void inc() { -add(1); - } - - public void inc(long n) { -add(n); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java index 6706be8..4e0c15c 100644 --- a/sdks/java/core/src/main/java/or
[1/4] beam git commit: [BEAM-1672] Use Accumulable MetricsContainers in Flink runner.
Repository: beam Updated Branches: refs/heads/master db0ec9991 -> 019d3002b [BEAM-1672] Use Accumulable MetricsContainers in Flink runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c2da9ad Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c2da9ad Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c2da9ad Branch: refs/heads/master Commit: 8c2da9ad1b8c195757f97feccdbcabcad735c407 Parents: 009cd6e Author: Aviem Zur Authored: Fri May 5 23:14:01 2017 +0300 Committer: Aviem Zur Committed: Sat May 6 08:27:49 2017 +0300 -- .../beam/runners/flink/FlinkRunnerResult.java | 8 +- .../metrics/DoFnRunnerWithMetricsUpdate.java| 12 +- .../flink/metrics/FlinkMetricContainer.java | 273 ++- .../flink/metrics/FlinkMetricResults.java | 146 -- .../flink/metrics/MetricsAccumulator.java | 60 .../flink/metrics/ReaderInvocationUtil.java | 7 +- .../translation/wrappers/SourceInputFormat.java | 8 +- .../streaming/io/BoundedSourceWrapper.java | 8 +- .../streaming/io/UnboundedSourceWrapper.java| 9 +- 9 files changed, 174 insertions(+), 357 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 90dc79b..038895a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -17,12 +17,15 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import java.io.IOException; import java.util.Collections; import java.util.Map; -import org.apache.beam.runners.flink.metrics.FlinkMetricResults; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.joda.time.Duration; /** @@ -72,6 +75,7 @@ public class FlinkRunnerResult implements PipelineResult { @Override public MetricResults metrics() { -return new FlinkMetricResults(accumulators); +return asAttemptedOnlyMetricResults( +(MetricsContainerStepMap) accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index dae91fe..40191d2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -34,6 +34,7 @@ import org.joda.time.Instant; */ public class DoFnRunnerWithMetricsUpdate implements DoFnRunner { + private final String stepName; private final FlinkMetricContainer container; private final DoFnRunner delegate; @@ -41,14 +42,15 @@ public class DoFnRunnerWithMetricsUpdate implements DoFnRunner< String stepName, DoFnRunner delegate, RuntimeContext runtimeContext) { +this.stepName = stepName; this.delegate = delegate; -container = new FlinkMetricContainer(stepName, runtimeContext); +container = new FlinkMetricContainer(runtimeContext); } @Override public void startBundle() { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.startBundle(); } catch (IOException e) { throw new RuntimeException(e); @@ -58,7 +60,7 @@ public class DoFnRunnerWithMetricsUpdate implements DoFnRunner< @Override public void processElement(final WindowedValue elem) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.processElement(elem); } catch (IOException e) { throw new
[4/4] beam git commit: This closes #2649
This closes #2649 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/019d3002 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/019d3002 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/019d3002 Branch: refs/heads/master Commit: 019d3002b0e2a7db9c5c2e84a0a95fad60f16422 Parents: db0ec99 8c2da9a Author: Aviem Zur Authored: Sat May 6 08:45:29 2017 +0300 Committer: Aviem Zur Committed: Sat May 6 08:45:29 2017 +0300 -- .../apache/beam/runners/core/LateDataUtils.java | 2 +- .../beam/runners/flink/FlinkRunnerResult.java | 8 +- .../metrics/DoFnRunnerWithMetricsUpdate.java| 12 +- .../flink/metrics/FlinkMetricContainer.java | 273 +++ .../flink/metrics/FlinkMetricResults.java | 146 -- .../flink/metrics/MetricsAccumulator.java | 60 +++ .../flink/metrics/ReaderInvocationUtil.java | 7 +- .../translation/wrappers/SourceInputFormat.java | 8 +- .../streaming/io/BoundedSourceWrapper.java | 8 +- .../streaming/io/UnboundedSourceWrapper.java| 9 +- .../beam/runners/spark/SparkPipelineResult.java | 8 +- .../apache/beam/runners/spark/io/SourceRDD.java | 4 +- .../runners/spark/io/SparkUnboundedSource.java | 19 +- .../spark/metrics/MetricsAccumulator.java | 20 +- .../spark/metrics/MetricsAccumulatorParam.java | 20 +- .../runners/spark/metrics/SparkBeamMetric.java | 11 +- .../spark/metrics/SparkBeamMetricSource.java| 2 +- .../spark/metrics/SparkMetricResults.java | 172 --- .../spark/metrics/SparkMetricsContainer.java| 174 --- .../SparkGroupAlsoByWindowViaWindowSet.java | 4 +- .../spark/stateful/StateSpecFunctions.java | 8 +- .../translation/DoFnRunnerWithMetrics.java | 6 +- .../spark/translation/MultiDoFnFunction.java| 6 +- .../spark/translation/TransformTranslator.java | 4 +- .../streaming/StreamingTransformTranslator.java | 4 +- .../apache/beam/sdk/metrics/CounterCell.java| 27 +- .../org/apache/beam/sdk/metrics/DirtyState.java | 3 +- .../beam/sdk/metrics/DistributionCell.java | 16 +- .../org/apache/beam/sdk/metrics/GaugeCell.java | 20 +- .../org/apache/beam/sdk/metrics/MetricCell.java | 14 +- .../org/apache/beam/sdk/metrics/Metrics.java| 2 +- .../beam/sdk/metrics/MetricsContainer.java | 29 +- .../sdk/metrics/MetricsContainerStepMap.java| 487 +++ .../org/apache/beam/sdk/metrics/MetricsMap.java | 5 +- .../beam/sdk/metrics/CounterCellTest.java | 6 +- .../metrics/MetricsContainerStepMapTest.java| 258 ++ .../beam/sdk/metrics/MetricsContainerTest.java | 14 +- 37 files changed, 1086 insertions(+), 790 deletions(-) --
[2/4] beam git commit: [BEAM-1672] Use Accumulable MetricsContainers in Spark runner.
[BEAM-1672] Use Accumulable MetricsContainers in Spark runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/009cd6e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/009cd6e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/009cd6e2 Branch: refs/heads/master Commit: 009cd6e289c77fd3bc997e2bcffaf49d72012c8e Parents: 46c2f93 Author: Aviem Zur Authored: Fri May 5 23:13:51 2017 +0300 Committer: Aviem Zur Committed: Sat May 6 08:27:49 2017 +0300 -- .../beam/runners/spark/SparkPipelineResult.java | 8 +- .../apache/beam/runners/spark/io/SourceRDD.java | 4 +- .../runners/spark/io/SparkUnboundedSource.java | 19 +- .../spark/metrics/MetricsAccumulator.java | 20 ++- .../spark/metrics/MetricsAccumulatorParam.java | 20 ++- .../runners/spark/metrics/SparkBeamMetric.java | 11 +- .../spark/metrics/SparkBeamMetricSource.java| 2 +- .../spark/metrics/SparkMetricResults.java | 172 -- .../spark/metrics/SparkMetricsContainer.java| 174 --- .../SparkGroupAlsoByWindowViaWindowSet.java | 4 +- .../spark/stateful/StateSpecFunctions.java | 8 +- .../translation/DoFnRunnerWithMetrics.java | 6 +- .../spark/translation/MultiDoFnFunction.java| 6 +- .../spark/translation/TransformTranslator.java | 4 +- .../streaming/StreamingTransformTranslator.java | 4 +- 15 files changed, 66 insertions(+), 396 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/009cd6e2/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index 3e94a45..3986e33 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -18,13 +18,15 @@ package org.apache.beam.runners.spark; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import java.io.IOException; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.beam.runners.spark.metrics.SparkMetricResults; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -41,7 +43,6 @@ public abstract class SparkPipelineResult implements PipelineResult { protected final Future pipelineExecution; protected JavaSparkContext javaSparkContext; protected PipelineResult.State state; - private final SparkMetricResults metricResults = new SparkMetricResults(); SparkPipelineResult(final Future pipelineExecution, final JavaSparkContext javaSparkContext) { this.pipelineExecution = pipelineExecution; @@ -106,7 +107,8 @@ public abstract class SparkPipelineResult implements PipelineResult { @Override public MetricResults metrics() { -return metricResults; +return asAttemptedOnlyMetricResults( +MetricsAccumulator.getInstance().value()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/009cd6e2/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index e294359..71a19e7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -26,12 +26,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.Accumulator; @@ -65,7 +65,7 @@ public class SourceRDD { private final SparkRuntimeC
[6/6] beam git commit: This closes #2729
This closes #2729 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b73918b5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b73918b5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b73918b5 Branch: refs/heads/master Commit: b73918b55ab06e5a47ef9dc33ae3dbaebaed330a Parents: 48c8ed1 8d91a97 Author: Aviem Zur Authored: Thu May 4 21:10:14 2017 +0300 Committer: Aviem Zur Committed: Thu May 4 21:10:14 2017 +0300 -- .../beam/runners/core/SideInputHandler.java | 10 +- .../apache/beam/runners/flink/FlinkRunner.java | 3 + .../FlinkStreamingTransformTranslators.java | 26 + .../wrappers/streaming/DoFnOperator.java| 27 - .../streaming/state/FlinkStateInternals.java| 2 + runners/spark/pom.xml | 47 +++- .../runners/spark/SparkRunnerRegistrar.java | 4 +- .../beam/runners/spark/TestSparkRunner.java | 47 .../apache/beam/runners/spark/CacheTest.java| 12 +- .../beam/runners/spark/ForceStreamingTest.java | 18 +-- .../apache/beam/runners/spark/PipelineRule.java | 109 --- .../runners/spark/ProvidedSparkContextTest.java | 10 +- .../runners/spark/SparkRunnerDebuggerTest.java | 15 +-- .../runners/spark/SparkRunnerRegistrarTest.java | 2 +- .../beam/runners/spark/StreamingTest.java | 23 .../metrics/sink/SparkMetricsSinkTest.java | 12 +- .../beam/runners/spark/io/AvroPipelineTest.java | 10 +- .../beam/runners/spark/io/NumShardsTest.java| 6 +- .../spark/translation/StorageLevelTest.java | 31 +- .../translation/streaming/CreateStreamTest.java | 53 - .../ResumeFromCheckpointStreamingTest.java | 62 +++ .../streaming/StreamingSourceMetricsTest.java | 14 +-- .../main/java/org/apache/beam/sdk/Pipeline.java | 2 +- .../beam/sdk/metrics/MetricsEnvironment.java| 5 + .../apache/beam/sdk/testing/TestPipeline.java | 61 ++- 25 files changed, 330 insertions(+), 281 deletions(-) --
[2/6] beam git commit: [BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner
[BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e2bb180 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e2bb180 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e2bb180 Branch: refs/heads/master Commit: 0e2bb1808350cbebf771d0971deb06787732800d Parents: 7c44935 Author: Aljoscha Krettek Authored: Sun Mar 19 07:49:08 2017 +0100 Committer: Aviem Zur Committed: Thu May 4 20:48:56 2017 +0300 -- .../FlinkStreamingTransformTranslators.java | 26 1 file changed, 26 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/0e2bb180/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index c024493..7339c01 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -966,10 +966,36 @@ class FlinkStreamingTransformTranslators { } else { DataStream result = null; + +// Determine DataStreams that we use as input several times. For those, we need to uniquify +// input streams because Flink seems to swallow watermarks when we have a union of one and +// the same stream. +Map, Integer> duplicates = new HashMap<>(); +for (PValue input : allInputs.values()) { + DataStream current = context.getInputDataStream(input); + Integer oldValue = duplicates.put(current, 1); + if (oldValue != null) { +duplicates.put(current, oldValue + 1); + } +} + for (PValue input : allInputs.values()) { DataStream current = context.getInputDataStream(input); + + final Integer timesRequired = duplicates.get(current); + if (timesRequired > 1) { +current = current.flatMap(new FlatMapFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(T t, Collector collector) throws Exception { +collector.collect(t); + } +}); + } result = (result == null) ? current : result.union(current); } + context.setOutputDataStream(context.getOutput(transform), result); } }
[3/6] beam git commit: [BEAM-1763] Verify PAssert execution in runners which support metrics.
[BEAM-1763] Verify PAssert execution in runners which support metrics. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95ade45e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95ade45e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95ade45e Branch: refs/heads/master Commit: 95ade45eced4787eb67a9d4d13dae48ffb176919 Parents: 48c8ed1 Author: Aviem Zur Authored: Tue May 2 19:00:29 2017 +0300 Committer: Aviem Zur Committed: Thu May 4 20:48:56 2017 +0300 -- .../apache/beam/runners/flink/FlinkRunner.java | 3 ++ .../beam/runners/spark/TestSparkRunner.java | 47 .../ResumeFromCheckpointStreamingTest.java | 12 +++-- .../beam/sdk/metrics/MetricsEnvironment.java| 5 +++ .../apache/beam/sdk/testing/TestPipeline.java | 46 --- 5 files changed, 57 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 181ffda..a5972ef 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -31,6 +31,7 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; @@ -103,6 +104,8 @@ public class FlinkRunner extends PipelineRunner { public PipelineResult run(Pipeline pipeline) { logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); +MetricsEnvironment.setMetricsSupported(true); + LOG.info("Executing pipeline using FlinkRunner."); FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options); http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 10e98b8..1e67813 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -40,15 +40,11 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -116,8 +112,6 @@ public final class TestSparkRunner extends PipelineRunner { } SparkPipelineResult result = null; -int expectedNumberOfAssertions = PAssert.countAsserts(pipeline); - // clear state of Aggregators, Metrics and Watermarks if exists. AggregatorsAccumulator.clear(); MetricsAccumulator.clear(); @@ -137,47 +131,6 @@ public final class TestSparkRunner extends PipelineRunner { String.format("Finish state %s is not allowed.", finishState), finishState, isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE)); - -// validate assertion succeeded (at least once). -long successAssertions = 0; -Iterable> counterResults = result.metrics().queryMetrics( -MetricsFilter.builder() -.addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER)) -.build()).counters(); -for (MetricResult counter : counterResults) { - if (counter.attempted().longValue() > 0) { -successAssertions++; - } -} -Integer expectedA
[4/6] beam git commit: [BEAM-1726] Fix empty side inputs in Flink Streaming Runner
[BEAM-1726] Fix empty side inputs in Flink Streaming Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/040d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/040d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/040d Branch: refs/heads/master Commit: 040d935c67f5cd48f2ffe2721a07fe6e0a50 Parents: 95ade45 Author: Aljoscha Krettek Authored: Sat Mar 18 12:16:06 2017 +0100 Committer: Aviem Zur Committed: Thu May 4 20:48:56 2017 +0300 -- .../beam/runners/core/SideInputHandler.java | 10 .../wrappers/streaming/DoFnOperator.java| 27 +++- 2 files changed, 31 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/040d/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index 5c67148..b29f9d0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -161,11 +162,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { @Override public T get(PCollectionView sideInput, BoundedWindow window) { -if (!isReady(sideInput, window)) { - throw new IllegalStateException( - "Side input " + sideInput + " is not ready for window " + window); -} - @SuppressWarnings("unchecked") Coder windowCoder = (Coder) sideInput @@ -181,6 +177,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { Iterable> elements = state.read(); +if (elements == null) { + elements = Collections.emptyList(); +} + return sideInput.getViewFn().apply(elements); } http://git-wip-us.apache.org/repos/asf/beam/blob/040d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index c624036..16bf5d2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -463,7 +463,32 @@ public class DoFnOperator @Override public void processWatermark2(Watermark mark) throws Exception { -// ignore watermarks from the side-input input +if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + pushbackDoFnRunner.startBundle(); + + BagState> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Iterable> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null) { +for (WindowedValue elem : pushedBackContents) { + + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(elem)); + + doFnRunner.processElement(elem); +} + } + + setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + + pushbackDoFnRunner.finishBundle(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); +} } @Override
[5/6] beam git commit: [BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals
[BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c44935e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c44935e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c44935e Branch: refs/heads/master Commit: 7c44935e1c47cce2ecfe842e37c2cf89f48d8583 Parents: 040 Author: Aviem Zur Authored: Sat Mar 18 15:21:45 2017 +0200 Committer: Aviem Zur Committed: Thu May 4 20:48:56 2017 +0300 -- .../translation/wrappers/streaming/state/FlinkStateInternals.java | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/7c44935e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index c033be6..cea6e0f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -592,6 +592,8 @@ public class FlinkStateInternals implements StateInternals { } current = combineFn.addInput(current, value); state.update(current); + } catch (RuntimeException re) { +throw re; } catch (Exception e) { throw new RuntimeException("Error adding to state." , e); }
[1/6] beam git commit: [BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests
Repository: beam Updated Branches: refs/heads/master 48c8ed176 -> b73918b55 [BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d91a97b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d91a97b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d91a97b Branch: refs/heads/master Commit: 8d91a97b77fbda74c577d2cdbd507395834e147c Parents: 0e2bb18 Author: Aviem Zur Authored: Wed May 3 21:06:00 2017 +0300 Committer: Aviem Zur Committed: Thu May 4 20:48:56 2017 +0300 -- runners/spark/pom.xml | 47 +++- .../runners/spark/SparkRunnerRegistrar.java | 4 +- .../apache/beam/runners/spark/CacheTest.java| 12 +- .../beam/runners/spark/ForceStreamingTest.java | 18 +-- .../apache/beam/runners/spark/PipelineRule.java | 109 --- .../runners/spark/ProvidedSparkContextTest.java | 10 +- .../runners/spark/SparkRunnerDebuggerTest.java | 15 +-- .../runners/spark/SparkRunnerRegistrarTest.java | 2 +- .../beam/runners/spark/StreamingTest.java | 23 .../metrics/sink/SparkMetricsSinkTest.java | 12 +- .../beam/runners/spark/io/AvroPipelineTest.java | 10 +- .../beam/runners/spark/io/NumShardsTest.java| 6 +- .../spark/translation/StorageLevelTest.java | 31 +- .../translation/streaming/CreateStreamTest.java | 53 - .../ResumeFromCheckpointStreamingTest.java | 50 ++--- .../streaming/StreamingSourceMetricsTest.java | 14 +-- .../main/java/org/apache/beam/sdk/Pipeline.java | 2 +- .../apache/beam/sdk/testing/TestPipeline.java | 21 +++- 18 files changed, 217 insertions(+), 222 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 38d250e..f7200d6 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -333,9 +333,6 @@ org.apache.maven.plugins maven-surefire-plugin - - org.apache.beam.runners.spark.UsesCheckpointRecovery - 1 false @@ -344,6 +341,50 @@ false + + + default-test + +test + + + + org.apache.beam.runners.spark.UsesCheckpointRecovery, + org.apache.beam.runners.spark.StreamingTest + + + +[ +"--runner=TestSparkRunner", +"--streaming=false", +"--enableSparkMetricSinks=true" +] + + + + + + streaming-tests + test + +test + + + + org.apache.beam.runners.spark.StreamingTest + + + +[ +"--runner=TestSparkRunner", +"--forceStreaming=true", +"--enableSparkMetricSinks=true" +] + + + + + org.codehaus.mojo http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index bedfda4..bf926dc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -54,7 +54,9 @@ public final class SparkRunnerRegistrar { public static class Options implements PipelineOptionsRegistrar { @Override public Iterable> getPipelineOptions() { - return ImmutableList.>of(SparkPipelineOptions.class); + return ImmutableList.>of( + SparkPipelineOptions.class, + TestSparkPipelineOptions.class); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTes
[GitHub] beam pull request #2824: [BEAM-2139] Disable SplittableDoFn ValidatesRunner ...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2824 [BEAM-2139] Disable SplittableDoFn ValidatesRunner tests for Streaming Flink Runner Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam disable-sdf-test-in-flink-streaming-flink-runner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2824.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2824 commit 44302d2ccfd8bd1ed54f6da7fc43db5e61798380 Author: Aviem Zur Date: 2017-05-02T15:01:54Z [BEAM-2139] Disable SplittableDoFn ValidatesRunner tests for Streaming Flink Runner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: [BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics
Repository: beam Updated Branches: refs/heads/master 1197bef19 -> b414f8de9 [BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics Gauge results are flaky on Jenkins, instead of asserting on value assert on the gauge's existence instead. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5dac56f7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5dac56f7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5dac56f7 Branch: refs/heads/master Commit: 5dac56f793c4851bca78dc6f4b4a70d34a016448 Parents: 1197bef Author: Aviem Zur Authored: Mon May 1 07:53:39 2017 +0300 Committer: Aviem Zur Committed: Mon May 1 10:50:23 2017 +0300 -- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 41 +--- 1 file changed, 26 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/5dac56f7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java -- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 591c099..ccbd3d6 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.kafka; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; @@ -99,6 +98,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Utils; import org.hamcrest.collection.IsIterableContainingInAnyOrder; +import org.hamcrest.collection.IsIterableWithSize; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -627,7 +627,6 @@ public class KafkaIOTest { MetricsFilter.builder().build()); Iterable> counters = metrics.counters(); -Iterable> gauges = metrics.gauges(); assertThat(counters, hasItem( MetricMatchers.attemptedMetricsResult( @@ -657,19 +656,31 @@ public class KafkaIOTest { readStep, 12000L))); -assertThat(gauges, hasItem( -attemptedMetricsResult( -backlogElementsOfSplit.namespace(), -backlogElementsOfSplit.name(), -readStep, -GaugeResult.create(0L, Instant.now(); - -assertThat(gauges, hasItem( -attemptedMetricsResult( -backlogBytesOfSplit.namespace(), -backlogBytesOfSplit.name(), -readStep, -GaugeResult.create(0L, Instant.now(); +MetricQueryResults backlogElementsMetrics = +result.metrics().queryMetrics( +MetricsFilter.builder() +.addNameFilter( +MetricNameFilter.named( +backlogElementsOfSplit.namespace(), +backlogElementsOfSplit.name())) +.build()); + +// since gauge values may be inconsistent in some environments assert only on their existence. +assertThat(backlogElementsMetrics.gauges(), +IsIterableWithSize.>iterableWithSize(1)); + +MetricQueryResults backlogBytesMetrics = +result.metrics().queryMetrics( +MetricsFilter.builder() +.addNameFilter( +MetricNameFilter.named( +backlogBytesOfSplit.namespace(), +backlogBytesOfSplit.name())) +.build()); + +// since gauge values may be inconsistent in some environments assert only on their existence. +assertThat(backlogBytesMetrics.gauges(), +IsIterableWithSize.>iterableWithSize(1)); } @Test
[2/2] beam git commit: This closes #2797
This closes #2797 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b414f8de Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b414f8de Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b414f8de Branch: refs/heads/master Commit: b414f8de90cd89ac76cd9cbe43b2d2d0049faa71 Parents: 1197bef 5dac56f Author: Aviem Zur Authored: Mon May 1 11:25:25 2017 +0300 Committer: Aviem Zur Committed: Mon May 1 11:25:25 2017 +0300 -- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 41 +--- 1 file changed, 26 insertions(+), 15 deletions(-) --
[GitHub] beam pull request #2797: [BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSour...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2797 [BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics Gauge results are flaky on Jenkins, instead of asserting on value assert on the gauge's existence instead. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam fix-flaky-kafkaio-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2797.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2797 commit f922bd46ef6899c775e36eacfdb11068cc011209 Author: Aviem Zur Date: 2017-05-01T04:53:39Z [BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics Gauge results are flaky on Jenkins, instead of asserting on value assert on the gauge's existence instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: This closes #2344
This closes #2344 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47821ad6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47821ad6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47821ad6 Branch: refs/heads/master Commit: 47821ad695f67977c775f62b6f8791ca109a7d0b Parents: 81474ae 930c27f Author: Aviem Zur Authored: Sat Apr 29 18:16:17 2017 +0300 Committer: Aviem Zur Committed: Sat Apr 29 18:16:17 2017 +0300 -- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 65 +- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 130 +++ 2 files changed, 194 insertions(+), 1 deletion(-) --
[1/2] beam git commit: [BEAM-1398] KafkaIO metrics.
Repository: beam Updated Branches: refs/heads/master 81474aeaf -> 47821ad69 [BEAM-1398] KafkaIO metrics. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/930c27f5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/930c27f5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/930c27f5 Branch: refs/heads/master Commit: 930c27f55fc980702089fe58fdb0edded96a2ac6 Parents: 81474ae Author: Aviem Zur Authored: Tue Mar 28 07:29:53 2017 +0300 Committer: Aviem Zur Committed: Sat Apr 29 18:08:19 2017 +0300 -- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 65 +- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 130 +++ 2 files changed, 194 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java -- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 47d8281..211f1a4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -69,6 +69,10 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer; import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.SinkMetrics; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; @@ -950,6 +954,13 @@ public class KafkaIO { private Deserializer keyDeserializerInstance = null; private Deserializer valueDeserializerInstance = null; +private final Counter elementsRead = SourceMetrics.elementsRead(); +private final Counter bytesRead = SourceMetrics.bytesRead(); +private final Counter elementsReadBySplit; +private final Counter bytesReadBySplit; +private final Gauge backlogBytesOfSplit; +private final Gauge backlogElementsOfSplit; + private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); @@ -1023,10 +1034,18 @@ public class KafkaIO { synchronized long approxBacklogInBytes() { // Note that is an an estimate of uncompressed backlog. +long backlogMessageCount = backlogMessageCount(); +if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) { + return UnboundedReader.BACKLOG_UNKNOWN; +} +return (long) (backlogMessageCount * avgRecordSize); + } + + synchronized long backlogMessageCount() { if (latestOffset < 0 || nextOffset < 0) { return UnboundedReader.BACKLOG_UNKNOWN; } -return Math.max(0, (long) ((latestOffset - nextOffset) * avgRecordSize)); +return Math.max(0, (latestOffset - nextOffset)); } } @@ -1065,6 +1084,13 @@ public class KafkaIO { partitionStates.get(i).nextOffset = ckptMark.getNextOffset(); } } + + String splitId = String.valueOf(source.id); + + elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId); + bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId); + backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId); + backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId); } private void consumerPollLoop() { @@ -1194,6 +1220,9 @@ public class KafkaIO { if (curBatch.hasNext()) { PartitionState pState = curBatch.next(); + elementsRead.inc(); + elementsReadBySplit.inc(); + if (!pState.recordIter.hasNext()) { // -- (c) pState.recordIter = Collections.emptyIterator(); // drop ref curBatch.remove(); @@ -1241,6 +1270,8 @@ public class KafkaIO { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); pState.recordConsumed(offset, recordSize); + bytesRead.inc(recordSize); + bytesReadBySplit.inc(recordSize); return true; } else { // -- (b) @@ -1278,6 +1309,19 @@ public class KafkaIO { LOG.debug("{}: backlog {}", this, getSplitBacklogBytes()); } +private void reportBacklog() { +
[2/2] beam git commit: This closes #2730
This closes #2730 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81474aea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81474aea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81474aea Branch: refs/heads/master Commit: 81474aeafeb4dbc16a48b62114dc6f348eb5f426 Parents: bac0633 5911773 Author: Aviem Zur Authored: Sat Apr 29 17:58:10 2017 +0300 Committer: Aviem Zur Committed: Sat Apr 29 17:58:10 2017 +0300 -- .../metrics/sink/SparkMetricsSinkTest.java | 86 1 file changed, 86 insertions(+) --
[1/2] beam git commit: [BEAM-2057] Add a test for metrics reporting in Spark runner.
Repository: beam Updated Branches: refs/heads/master bac06331e -> 81474aeaf [BEAM-2057] Add a test for metrics reporting in Spark runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59117737 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59117737 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59117737 Branch: refs/heads/master Commit: 59117737619ba90345761ae0aefcf361eabf3772 Parents: bac0633 Author: Holden Karau Authored: Wed Apr 26 22:22:49 2017 -0700 Committer: Aviem Zur Committed: Sat Apr 29 17:57:42 2017 +0300 -- .../metrics/sink/SparkMetricsSinkTest.java | 86 1 file changed, 86 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/59117737/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java new file mode 100644 index 000..b0ad972 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import org.apache.beam.runners.spark.PipelineRule; +import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExternalResource; + + +/** + * A test that verifies Beam metrics are reported to Spark's metrics sink. + */ +public class SparkMetricsSinkTest { + + @Rule + public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); + + @Rule + public final PipelineRule pipelineRule = PipelineRule.batch(); + + private Pipeline createSparkPipeline() { +pipelineRule.getOptions().setEnableSparkMetricSinks(true); +return pipelineRule.createPipeline(); + } + + private void runPipeline() { +final List words = +Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"); + +final Set expectedCounts = +ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + +final Pipeline pipeline = createSparkPipeline(); + +final PCollection output = +pipeline +.apply(Create.of(words).withCoder(StringUtf8Coder.of())) +.apply(new WordCount.CountWords()) +.apply(MapElements.via(new WordCount.FormatAsTextFn())); + +PAssert.that(output).containsInAnyOrder(expectedCounts); + +pipeline.run(); + } + + @Test + public void testNamedMetric() throws Exception { +assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); + +runPipeline(); + +assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d)); + } +}
[GitHub] beam pull request #2781: Fix broken build
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2781 Fix broken build Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam fix-broken-build Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2781.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2781 commit 2384e46be0f49ffa9e5c94ec929b344a46a72af6 Author: Aviem Zur Date: 2017-04-29T11:08:06Z Fix broken build --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2746: [BEAM-2029] NullPointerException when using multi o...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2746 [BEAM-2029] NullPointerException when using multi output ParDo in Spark runner in streaming mode. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam npe-in-multiple-output-pardo-spark-streaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2746.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2746 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2729: [BEAM-1763] Verify PAssert execution in runners whi...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2729 [BEAM-1763] Verify PAssert execution in runners which support metrics. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam testpipeline-passert-verification Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2729.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2729 commit 6ca854c9cbe3e8239945dcb1b277d89fadf5dfdd Author: Aviem Zur Date: 2017-04-27T05:19:06Z [BEAM-1763] Verify PAssert execution in runners which support metrics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: [BEAM-1958] Standard IO Metrics in Java SDK
Repository: beam Updated Branches: refs/heads/master f3cff3695 -> 30dbaf891 [BEAM-1958] Standard IO Metrics in Java SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/41d52be0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/41d52be0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/41d52be0 Branch: refs/heads/master Commit: 41d52be0ec64c83a79d97bfd3c27eb104b546991 Parents: f3cff36 Author: Aviem Zur Authored: Thu Apr 13 20:27:33 2017 +0300 Committer: Aviem Zur Committed: Wed Apr 26 19:53:29 2017 +0300 -- .../streaming/StreamingSourceMetricsTest.java | 12 +- .../org/apache/beam/sdk/io/CountingSource.java | 6 +- .../apache/beam/sdk/metrics/CounterCell.java| 19 +-- .../beam/sdk/metrics/DistributionCell.java | 10 +- .../org/apache/beam/sdk/metrics/GaugeCell.java | 8 +- .../org/apache/beam/sdk/metrics/Metric.java | 8 +- .../org/apache/beam/sdk/metrics/MetricCell.java | 8 +- .../org/apache/beam/sdk/metrics/Metrics.java| 12 ++ .../beam/sdk/metrics/MetricsContainer.java | 8 +- .../apache/beam/sdk/metrics/SinkMetrics.java| 49 .../apache/beam/sdk/metrics/SourceMetrics.java | 116 +++ .../apache/beam/sdk/metrics/MetricsTest.java| 19 ++- .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 4 +- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 5 +- .../io/gcp/pubsub/PubsubUnboundedSource.java| 4 +- 15 files changed, 228 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index 80f7f53..5a4b1b5 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -29,9 +29,11 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; @@ -41,6 +43,7 @@ import org.junit.Test; * Verify metrics support for {@link Source Sources} in streaming pipelines. */ public class StreamingSourceMetricsTest implements Serializable { + private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName(); // Force streaming pipeline using pipeline rule. @Rule @@ -65,10 +68,15 @@ public class StreamingSourceMetricsTest implements Serializable { .metrics() .queryMetrics( MetricsFilter.builder() -.addNameFilter(MetricNameFilter.named("io", "elementsRead")) +.addNameFilter( +MetricNameFilter.named(ELEMENTS_READ.namespace(), ELEMENTS_READ.name())) .build()); assertThat(metrics.counters(), hasItem( -attemptedMetricsResult("io", "elementsRead", "Read(UnboundedCountingSource)", 1000L))); +attemptedMetricsResult( +ELEMENTS_READ.namespace(), +ELEMENTS_READ.name(), +"Read(UnboundedCountingSource)", +1000L))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index b66a8b2..81082e5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptions;
[2/2] beam git commit: This closes #2538
This closes #2538 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30dbaf89 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30dbaf89 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30dbaf89 Branch: refs/heads/master Commit: 30dbaf8913eda2b91c8fc43823f7e5feee17528a Parents: f3cff36 41d52be Author: Aviem Zur Authored: Wed Apr 26 20:26:24 2017 +0300 Committer: Aviem Zur Committed: Wed Apr 26 20:26:24 2017 +0300 -- .../streaming/StreamingSourceMetricsTest.java | 12 +- .../org/apache/beam/sdk/io/CountingSource.java | 6 +- .../apache/beam/sdk/metrics/CounterCell.java| 19 +-- .../beam/sdk/metrics/DistributionCell.java | 10 +- .../org/apache/beam/sdk/metrics/GaugeCell.java | 8 +- .../org/apache/beam/sdk/metrics/Metric.java | 8 +- .../org/apache/beam/sdk/metrics/MetricCell.java | 8 +- .../org/apache/beam/sdk/metrics/Metrics.java| 12 ++ .../beam/sdk/metrics/MetricsContainer.java | 8 +- .../apache/beam/sdk/metrics/SinkMetrics.java| 49 .../apache/beam/sdk/metrics/SourceMetrics.java | 116 +++ .../apache/beam/sdk/metrics/MetricsTest.java| 19 ++- .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 4 +- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 5 +- .../io/gcp/pubsub/PubsubUnboundedSource.java| 4 +- 15 files changed, 228 insertions(+), 60 deletions(-) --
[GitHub] beam pull request #2547: [BEAM-1758] Option to disable metrics reporting.
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2547 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2522: [BEAM-1672] Extract interface MetricData
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2522 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2649: [BEAM-1672] Accumulable MetricsContainers.
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2649 [BEAM-1672] Accumulable MetricsContainers. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam accumulable-metricscontainer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2649.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2649 commit 9f81af4b1a71af97672956e75298cff115c63fad Author: Aviem Zur Date: 2017-04-22T14:45:35Z [BEAM-1672] Make MetricsContainers accumulable. commit 1a208e2dd7128a25cf62ee75fae4078b2af665f9 Author: Aviem Zur Date: 2017-04-22T16:00:11Z [BEAM-1672] AccumulatedMetricsResults commit f3f9b588eb473705506dc4bbc2020e3628e2e843 Author: Aviem Zur Date: 2017-04-22T16:00:51Z [BEAM-1672] Use Accumulable MetricsContainers in Spark runner. commit f41466b23ba475e141fde6d3db8c5c2fc14470c9 Author: Aviem Zur Date: 2017-04-22T16:01:38Z [BEAM-1672] Use Accumulable MetricsContainers in Flink runner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2547: [BEAM-1758] Option to disable metrics reporting.
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2547 [BEAM-1758] Option to disable metrics reporting. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam metrics-disable-option Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2547 commit 626c4733a7046e9aaa2e8e4985edf8087993b71a Author: Aviem Zur Date: 2017-04-15T06:59:20Z [BEAM-1758] Option to disable metrics reporting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2538: [BEAM-1958] Standard IO Metrics in Java SDK
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2538 [BEAM-1958] Standard IO Metrics in Java SDK Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam standard-io-metrics-java Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2538.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2538 commit e1b4db663931c363551b262f21c3f20ddb9a211a Author: Aviem Zur Date: 2017-04-13T17:27:33Z [BEAM-1958] Standard IO Metrics in Java SDK --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: This closes #2514
This closes #2514 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21a2b96a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21a2b96a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21a2b96a Branch: refs/heads/master Commit: 21a2b96a1f617870aa5926d69878a72533ba259e Parents: dc672f4 fdabd41 Author: Aviem Zur Authored: Thu Apr 13 07:36:34 2017 +0300 Committer: Aviem Zur Committed: Thu Apr 13 07:36:34 2017 +0300 -- .../java/org/apache/beam/runners/spark/io/MicrobatchSource.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] beam git commit: [BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache
Repository: beam Updated Branches: refs/heads/master dc672f420 -> 21a2b96a1 [BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdabd41a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdabd41a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdabd41a Branch: refs/heads/master Commit: fdabd41ae7bf0faaae979a12bb50720ae2ed24ce Parents: dc672f4 Author: Aviem Zur Authored: Wed Apr 12 20:41:11 2017 +0300 Committer: Aviem Zur Committed: Thu Apr 13 07:36:20 2017 +0300 -- .../java/org/apache/beam/runners/spark/io/MicrobatchSource.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/fdabd41a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 002eb34..847de19 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -141,7 +141,7 @@ public class MicrobatchSource
[GitHub] beam pull request #2522: [BEAM-1672] Extract interface MetricData
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2522 [BEAM-1672] Extract interface MetricData Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam metricdata-interface Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2522.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2522 commit aa41c2cbe28797686a34eb47c26502953c86b462 Author: Aviem Zur Date: 2017-04-09T19:41:12Z [BEAM-1672] Extract interface MetricData --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2514: [BEAM-1950] Add missing 'static' keyword to Microba...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2514 [BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam fix-microbatchsource-readercache-lock Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2514.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2514 commit 5751c0df58a747acc9c6138dfeef45fcb5dc9ec2 Author: Aviem Zur Date: 2017-04-12T17:41:11Z [BEAM-1950] Add missing 'static' keyword to MicrobatchSource#initReaderCache --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2263: [BEAM-1726] TestFlinkRunner should assert PAssert s...
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2263 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #208: Add JSON as a planned IO to built-in IOs page.
GitHub user aviemzur opened a pull request: https://github.com/apache/beam-site/pull/208 Add JSON as a planned IO to built-in IOs page. R: @iemejia You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam-site add-json-to-io-page Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/208.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #208 commit d4e83a258f5d1ae85e2b1f8c6c98c1ddca70 Author: Zur, Aviem Date: 2017-04-12T05:22:09Z Add JSON as a planned IO to built-in IOs page. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: This closes #2033
This closes #2033 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/efd785f8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/efd785f8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/efd785f8 Branch: refs/heads/master Commit: efd785f881d2c231f202c5031d6aeeb042177850 Parents: a0cfccd d958796 Author: Aviem Zur Authored: Sun Apr 9 22:47:03 2017 +0300 Committer: Aviem Zur Committed: Sun Apr 9 22:47:03 2017 +0300 -- .../beam/runners/spark/io/MicrobatchSource.java | 113 --- .../beam/runners/spark/io/SourceDStream.java| 11 +- .../spark/stateful/StateSpecFunctions.java | 6 +- .../ResumeFromCheckpointStreamingTest.java | 14 ++- 4 files changed, 118 insertions(+), 26 deletions(-) --
[1/2] beam git commit: [BEAM-1294] Long running UnboundedSource Readers
Repository: beam Updated Branches: refs/heads/master a0cfccda4 -> efd785f88 [BEAM-1294] Long running UnboundedSource Readers Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d958796b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d958796b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d958796b Branch: refs/heads/master Commit: d958796b525861764318f0c022e4987aa64ac300 Parents: a0cfccd Author: Aviem Zur Authored: Fri Feb 17 12:35:49 2017 +0200 Committer: Aviem Zur Committed: Sun Apr 9 22:42:57 2017 +0300 -- .../beam/runners/spark/io/MicrobatchSource.java | 113 --- .../beam/runners/spark/io/SourceDStream.java| 11 +- .../spark/stateful/StateSpecFunctions.java | 6 +- .../ResumeFromCheckpointStreamingTest.java | 14 ++- 4 files changed, 118 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index ff818a1..002eb34 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -19,11 +19,18 @@ package org.apache.beam.runners.spark.io; import com.google.api.client.util.BackOff; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -49,29 +56,34 @@ import org.slf4j.LoggerFactory; public class MicrobatchSource extends BoundedSource { private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class); + private static volatile Cache, BoundedReader> readerCache; private final UnboundedSource source; private final Duration maxReadTime; private final int numInitialSplits; private final long maxNumRecords; private final int sourceId; + private final double readerCacheInterval; // each split of the underlying UnboundedSource is associated with a (consistent) id // to match it's corresponding CheckpointMark state. private final int splitId; - MicrobatchSource(UnboundedSource source, - Duration maxReadTime, - int numInitialSplits, - long maxNumRecords, - int splitId, - int sourceId) { + MicrobatchSource( + UnboundedSource source, + Duration maxReadTime, + int numInitialSplits, + long maxNumRecords, + int splitId, + int sourceId, + double readerCacheInterval) { this.source = source; this.maxReadTime = maxReadTime; this.numInitialSplits = numInitialSplits; this.maxNumRecords = maxNumRecords; this.splitId = splitId; this.sourceId = sourceId; +this.readerCacheInterval = readerCacheInterval; } /** @@ -101,7 +113,8 @@ public class MicrobatchSource(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId)); + result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId, + readerCacheInterval)); } return result; } @@ -113,12 +126,30 @@ public class MicrobatchSource createReader(PipelineOptions options) throws IOException { -return createReader(options, null); +return getOrCreateReader(options, null); } - public BoundedReader createReader(PipelineOptions options, CheckpointMarkT checkpointMark) - throws IOException { -return new Reader(source.createReader(options, checkpointMark)); + @SuppressWarnings("unchecked") + public BoundedReader getOrCreateReader( + PipelineOptions options, + CheckpointMarkT checkpointMark) throws IOException { +try { + initReaderCache((long) readerCacheInterval); + return (BoundedReader) readerCache.get(this, new ReaderLoader(options, checkpointMark)); +} catch (ExecutionException e) { + throw new RuntimeException("Failed to get or create reader", e); +} + } + + private synchronized void initReaderCache(long reade
[1/2] beam git commit: [BEAM-1337] Infer state coders
Repository: beam Updated Branches: refs/heads/master 03dce6dcc -> e31ca8b0d [BEAM-1337] Infer state coders Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/42e690e8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/42e690e8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/42e690e8 Branch: refs/heads/master Commit: 42e690e84a9f05d508f2528b1444b26ce031e080 Parents: 03dce6d Author: Aviem Zur Authored: Wed Mar 1 07:27:57 2017 +0200 Committer: Aviem Zur Committed: Sat Apr 1 10:27:14 2017 +0300 -- .../org/apache/beam/sdk/transforms/ParDo.java | 62 ++ .../apache/beam/sdk/util/state/StateSpec.java | 15 + .../apache/beam/sdk/util/state/StateSpecs.java | 264 - .../apache/beam/sdk/transforms/ParDoTest.java | 578 +++ 4 files changed, 902 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/42e690e8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 664fbc3..3de845b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.Serializable; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -29,6 +31,7 @@ import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -41,6 +44,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -434,6 +438,59 @@ public class ParDo { return DisplayData.item("fn", fn.getClass()).withLabel("Transform Function"); } + private static void finishSpecifyingStateSpecs( + DoFn fn, + CoderRegistry coderRegistry, + Coder inputCoder) { +DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); +Map stateDeclarations = signature.stateDeclarations(); +for (DoFnSignature.StateDeclaration stateDeclaration : stateDeclarations.values()) { + try { +StateSpec stateSpec = (StateSpec) stateDeclaration.field().get(fn); +stateSpec.offerCoders(codersForStateSpecTypes(stateDeclaration, coderRegistry, inputCoder)); +stateSpec.finishSpecifying(); + } catch (IllegalAccessException e) { +throw new RuntimeException(e); + } +} + } + + /** + * Try to provide coders for as many of the type arguments of given + * {@link DoFnSignature.StateDeclaration} as possible. + */ + private static Coder[] codersForStateSpecTypes( + DoFnSignature.StateDeclaration stateDeclaration, + CoderRegistry coderRegistry, + Coder inputCoder) { +Type stateType = stateDeclaration.stateType().getType(); + +if (!(stateType instanceof ParameterizedType)) { + // No type arguments means no coders to infer. + return new Coder[0]; +} + +Type[] typeArguments = ((ParameterizedType) stateType).getActualTypeArguments(); +Coder[] coders = new Coder[typeArguments.length]; + +for (int i = 0; i < typeArguments.length; i++) { + Type typeArgument = typeArguments[i]; + TypeDescriptor typeDescriptor = TypeDescriptor.of(typeArgument); + try { +coders[i] = coderRegistry.getDefaultCoder(typeDescriptor); + } catch (CannotProvideCoderException e) { +try { + coders[i] = coderRegistry.getDefaultCoder( + typeDescriptor, inputCoder.getEncodedTypeDescriptor(), inputCoder); +} catch (CannotProvideCoderException ignored) { + // Since not all type arguments will have a registered coder we ignore this exception. +} + } +} + +return coders; + } + /** * Perform common validat
[2/2] beam git commit: This closes #2133
This closes #2133 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e31ca8b0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e31ca8b0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e31ca8b0 Branch: refs/heads/master Commit: e31ca8b0d05e47c2588d5db29c92bac49aa410da Parents: 03dce6d 42e690e Author: Aviem Zur Authored: Sat Apr 1 10:28:25 2017 +0300 Committer: Aviem Zur Committed: Sat Apr 1 10:28:25 2017 +0300 -- .../org/apache/beam/sdk/transforms/ParDo.java | 62 ++ .../apache/beam/sdk/util/state/StateSpec.java | 15 + .../apache/beam/sdk/util/state/StateSpecs.java | 264 - .../apache/beam/sdk/transforms/ParDoTest.java | 578 +++ 4 files changed, 902 insertions(+), 17 deletions(-) --
[GitHub] beam pull request #2108: [BEAM-463] [BEAM-464] [BEAM-466] BoundedHeapCoder, ...
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2108 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: This closes #2345
This closes #2345 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d35e1b0d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d35e1b0d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d35e1b0d Branch: refs/heads/master Commit: d35e1b0d9e47e4eec5c48b5e87ccc9f55955cdf4 Parents: 48fee91 cb05a20 Author: Aviem Zur Authored: Tue Mar 28 13:05:22 2017 +0300 Committer: Aviem Zur Committed: Tue Mar 28 13:05:22 2017 +0300 -- .../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] beam git commit: Fix typo in test pipeline enforcement message
Repository: beam Updated Branches: refs/heads/master 48fee91f7 -> d35e1b0d9 Fix typo in test pipeline enforcement message Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cb05a20e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cb05a20e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cb05a20e Branch: refs/heads/master Commit: cb05a20e55677b01d1abe0dfda696d1a4f4d1a0c Parents: 48fee91 Author: Kobi Salant Authored: Tue Mar 28 10:25:18 2017 +0300 Committer: ksalant Committed: Tue Mar 28 11:40:03 2017 +0300 -- .../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/cb05a20e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 485dd39..6a8335e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -299,7 +299,7 @@ public class TestPipeline extends Pipeline implements TestRule { checkState( enforcement.isPresent(), "Is your TestPipeline declaration missing a @Rule annotation? Usage: " -+ "@Rule public final transient TestPipeline pipeline = TestPipeline.Create();"); ++ "@Rule public final transient TestPipeline pipeline = TestPipeline.create();"); try { return super.run();
[GitHub] beam pull request #2344: [BEAM-1398] KafkaIO metrics.
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2344 [BEAM-1398] KafkaIO metrics. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam kafka-io-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2344.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2344 commit 200ef65c5751327aa058421d035f5f68e3db0ec1 Author: Aviem Zur Date: 2017-03-28T04:29:53Z [BEAM-1398] KafkaIO metrics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: This closes #2162
This closes #2162 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48fee91f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48fee91f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48fee91f Branch: refs/heads/master Commit: 48fee91f7d720d03da53476e9a237eabcbfc0460 Parents: 85b820c 65b5f00 Author: Aviem Zur Authored: Tue Mar 28 06:51:39 2017 +0300 Committer: Aviem Zur Committed: Tue Mar 28 06:51:39 2017 +0300 -- .../beam/runners/spark/TestSparkRunner.java | 14 +++- .../apache/beam/runners/spark/io/SourceRDD.java | 51 +- .../runners/spark/io/SparkUnboundedSource.java | 48 + .../spark/metrics/SparkMetricsContainer.java| 11 ++- .../spark/stateful/StateSpecFunctions.java | 35 +++--- .../spark/translation/TransformTranslator.java | 3 +- .../streaming/StreamingTransformTranslator.java | 4 +- .../streaming/StreamingSourceMetricsTest.java | 71 .../org/apache/beam/sdk/io/CountingSource.java | 8 +++ .../apache/beam/sdk/metrics/MetricsTest.java| 45 + 10 files changed, 244 insertions(+), 46 deletions(-) --
[1/2] beam git commit: [BEAM-1397] Introduce IO metrics
Repository: beam Updated Branches: refs/heads/master 85b820c37 -> 48fee91f7 [BEAM-1397] Introduce IO metrics Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/65b5f001 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/65b5f001 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/65b5f001 Branch: refs/heads/master Commit: 65b5f001a4e1790206efe3ff2d418018680ea621 Parents: 85b820c Author: Aviem Zur Authored: Tue Mar 28 06:49:59 2017 +0300 Committer: Aviem Zur Committed: Tue Mar 28 06:49:59 2017 +0300 -- .../beam/runners/spark/TestSparkRunner.java | 14 +++- .../apache/beam/runners/spark/io/SourceRDD.java | 51 +- .../runners/spark/io/SparkUnboundedSource.java | 48 + .../spark/metrics/SparkMetricsContainer.java| 11 ++- .../spark/stateful/StateSpecFunctions.java | 35 +++--- .../spark/translation/TransformTranslator.java | 3 +- .../streaming/StreamingTransformTranslator.java | 4 +- .../streaming/StreamingSourceMetricsTest.java | 71 .../org/apache/beam/sdk/io/CountingSource.java | 8 +++ .../apache/beam/sdk/metrics/MetricsTest.java| 45 + 10 files changed, 244 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/65b5f001/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index e40534f..be9ff2e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -135,7 +135,12 @@ public final class TestSparkRunner extends PipelineRunner { isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE)); // validate assertion succeeded (at least once). -int successAssertions = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); +int successAssertions = 0; +try { + successAssertions = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); +} catch (NullPointerException e) { + // No assertions registered will cause an NPE here. +} Integer expectedAssertions = testSparkPipelineOptions.getExpectedAssertions() != null ? testSparkPipelineOptions.getExpectedAssertions() : expectedNumberOfAssertions; assertThat( @@ -145,7 +150,12 @@ public final class TestSparkRunner extends PipelineRunner { successAssertions, is(expectedAssertions)); // validate assertion didn't fail. -int failedAssertions = result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class); +int failedAssertions = 0; +try { + failedAssertions = result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class); +} catch (NullPointerException e) { + // No assertions registered will cause an NPE here. +} assertThat( String.format("Found %d failed assertions.", failedAssertions), failedAssertions, http://git-wip-us.apache.org/repos/asf/beam/blob/65b5f001/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index 1a3537f..2f9a827 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -20,15 +20,21 @@ package org.apache.beam.runners.spark.io; import static com.google.common.base.Preconditions.checkArgument; +import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.List; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; +import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.spark.Accumulator; import org.apache.spark.Dependency; import org.apache.spark.HashPartitioner; import org.apache.spark.InterruptibleItera
[1/2] beam git commit: [BEAM-1792] Use MetricFiltering in Spark runner.
Repository: beam Updated Branches: refs/heads/master fe441e34b -> 85b820c37 [BEAM-1792] Use MetricFiltering in Spark runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/241ded90 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/241ded90 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/241ded90 Branch: refs/heads/master Commit: 241ded9022a9214c1d0768b1cb3c7a740a409873 Parents: fe441e3 Author: Pablo Authored: Fri Mar 24 10:48:43 2017 -0700 Committer: Aviem Zur Committed: Tue Mar 28 05:51:14 2017 +0300 -- .../spark/metrics/SparkMetricResults.java | 40 +--- 1 file changed, 2 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/241ded90/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java index c02027a..faf4c52 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java @@ -19,17 +19,15 @@ package org.apache.beam.runners.spark.metrics; import com.google.common.base.Function; -import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; -import java.util.Set; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeData; import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricFiltering; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; @@ -88,44 +86,10 @@ public class SparkMetricResults extends MetricResults { return new Predicate>() { @Override public boolean apply(MetricUpdate metricResult) { - return matches(filter, metricResult.getKey()); + return MetricFiltering.matches(filter, metricResult.getKey()); } }; } - -private boolean matches(MetricsFilter filter, MetricKey key) { - return matchesName(key.metricName(), filter.names()) - && matchesScope(key.stepName(), filter.steps()); -} - -private boolean matchesName(MetricName metricName, Set nameFilters) { - if (nameFilters.isEmpty()) { -return true; - } - - for (MetricNameFilter nameFilter : nameFilters) { -if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) -&& Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { - return true; -} - } - - return false; -} - -private boolean matchesScope(String actualScope, Set scopes) { - if (scopes.isEmpty() || scopes.contains(actualScope)) { -return true; - } - - for (String scope : scopes) { -if (actualScope.startsWith(scope)) { - return true; -} - } - - return false; -} } private static final Function, MetricResult>
[2/2] beam git commit: This closes #2304
This closes #2304 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85b820c3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85b820c3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85b820c3 Branch: refs/heads/master Commit: 85b820c3799ff292873acfd5d456c1f3c4321ae9 Parents: fe441e3 241ded9 Author: Aviem Zur Authored: Tue Mar 28 05:55:49 2017 +0300 Committer: Aviem Zur Committed: Tue Mar 28 05:55:49 2017 +0300 -- .../spark/metrics/SparkMetricResults.java | 40 +--- 1 file changed, 2 insertions(+), 38 deletions(-) --
[2/2] beam git commit: This closes #2151
This closes #2151 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b26e10b4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b26e10b4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b26e10b4 Branch: refs/heads/master Commit: b26e10b44a0b82359dea7b96e0d49dd595fae785 Parents: 026aec8 63e953c Author: Aviem Zur Authored: Mon Mar 27 19:37:52 2017 +0300 Committer: Aviem Zur Committed: Mon Mar 27 19:37:52 2017 +0300 -- .../beam/runners/direct/DirectMetrics.java | 59 +- .../beam/runners/direct/DirectMetricsTest.java | 42 -- .../beam/runners/dataflow/DataflowMetrics.java | 16 +++- .../runners/spark/metrics/SparkBeamMetric.java | 4 + .../spark/metrics/SparkMetricResults.java | 27 +++ .../spark/metrics/SparkMetricsContainer.java| 20 + .../java/org/apache/beam/sdk/metrics/Gauge.java | 32 .../org/apache/beam/sdk/metrics/GaugeCell.java | 60 +++ .../org/apache/beam/sdk/metrics/GaugeData.java | 81 .../apache/beam/sdk/metrics/GaugeResult.java| 61 +++ .../beam/sdk/metrics/MetricQueryResults.java| 3 + .../apache/beam/sdk/metrics/MetricUpdates.java | 11 ++- .../org/apache/beam/sdk/metrics/Metrics.java| 35 + .../beam/sdk/metrics/MetricsContainer.java | 26 ++- .../apache/beam/sdk/metrics/GaugeCellTest.java | 48 .../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++- .../apache/beam/sdk/metrics/MetricsTest.java| 37 + 17 files changed, 539 insertions(+), 35 deletions(-) --
[1/2] beam git commit: [BEAM-1617] Add Gauge metric type to Java SDK
Repository: beam Updated Branches: refs/heads/master 026aec856 -> b26e10b44 [BEAM-1617] Add Gauge metric type to Java SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63e953c6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63e953c6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63e953c6 Branch: refs/heads/master Commit: 63e953c6026192e5e027f0bac183b86992480127 Parents: 026aec8 Author: Aviem Zur Authored: Fri Mar 3 14:42:23 2017 +0200 Committer: Aviem Zur Committed: Mon Mar 27 19:01:58 2017 +0300 -- .../beam/runners/direct/DirectMetrics.java | 59 +- .../beam/runners/direct/DirectMetricsTest.java | 42 -- .../beam/runners/dataflow/DataflowMetrics.java | 16 +++- .../runners/spark/metrics/SparkBeamMetric.java | 4 + .../spark/metrics/SparkMetricResults.java | 27 +++ .../spark/metrics/SparkMetricsContainer.java| 20 + .../java/org/apache/beam/sdk/metrics/Gauge.java | 32 .../org/apache/beam/sdk/metrics/GaugeCell.java | 60 +++ .../org/apache/beam/sdk/metrics/GaugeData.java | 81 .../apache/beam/sdk/metrics/GaugeResult.java| 61 +++ .../beam/sdk/metrics/MetricQueryResults.java| 3 + .../apache/beam/sdk/metrics/MetricUpdates.java | 11 ++- .../org/apache/beam/sdk/metrics/Metrics.java| 35 + .../beam/sdk/metrics/MetricsContainer.java | 26 ++- .../apache/beam/sdk/metrics/GaugeCellTest.java | 48 .../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++- .../apache/beam/sdk/metrics/MetricsTest.java| 37 + 17 files changed, 539 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index f04dc21..fb126fb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -33,6 +33,8 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricFiltering; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; @@ -193,6 +195,28 @@ class DirectMetrics extends MetricResults { } }; + private static final MetricAggregation GAUGE = + new MetricAggregation() { +@Override +public GaugeData zero() { + return GaugeData.empty(); +} + +@Override +public GaugeData combine(Iterable updates) { + GaugeData result = GaugeData.empty(); + for (GaugeData update : updates) { +result = result.combine(update); + } + return result; +} + +@Override +public GaugeResult extract(GaugeData data) { + return data.extractResult(); +} + }; + /** The current values of counters in memory. */ private MetricsMap> counters = new MetricsMap<>(new MetricsMap.Factory>() { @@ -210,13 +234,23 @@ class DirectMetrics extends MetricResults { return new DirectMetric<>(DISTRIBUTION); } }); + private MetricsMap> gauges = + new MetricsMap<>( + new MetricsMap.Factory>() { +@Override +public DirectMetric createInstance( +MetricKey unusedKey) { + return new DirectMetric<>(GAUGE); +} + }); @AutoValue abstract static class DirectMetricQueryResults implements MetricQueryResults { public static MetricQueryResults create( Iterable> counters, -Iterable> distributions) { - return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions); +Iterable> distributions, +Iterable> gauges) { + return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions, gauges); } } @@ -248,8 +282,15 @@ class DirectMetrics extends MetricResults { : distributions.entries()) { maybeExtractResult(filter, distributionResults, distribution); } +ImmutableList.Builder> gaugeResults = +ImmutableList.builder(); +for (Entry> gauge +: gauges.ent
[2/2] beam git commit: This closes #2328
This closes #2328 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9e55a43 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9e55a43 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9e55a43 Branch: refs/heads/master Commit: c9e55a4360a9fe06d6ed943a222bce524a6b10af Parents: 348d335 b32f048 Author: Aviem Zur Authored: Sun Mar 26 11:23:45 2017 +0300 Committer: Aviem Zur Committed: Sun Mar 26 11:23:45 2017 +0300 -- .../translation/GroupCombineFunctions.java | 15 ++- .../spark/translation/TransformTranslator.java | 26 2 files changed, 25 insertions(+), 16 deletions(-) --
[1/2] beam git commit: [BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs
Repository: beam Updated Branches: refs/heads/master 348d33588 -> c9e55a436 [BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32f0482 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32f0482 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32f0482 Branch: refs/heads/master Commit: b32f0482784b9df7ce67226b32febe6e664a45b6 Parents: 348d335 Author: Aviem Zur Authored: Sat Mar 25 21:49:06 2017 +0300 Committer: Aviem Zur Committed: Sun Mar 26 10:31:40 2017 +0300 -- .../translation/GroupCombineFunctions.java | 15 ++- .../spark/translation/TransformTranslator.java | 26 2 files changed, 25 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index b2a589d..917a9ee 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -18,8 +18,7 @@ package org.apache.beam.runners.spark.translation; -import static com.google.common.base.Preconditions.checkArgument; - +import com.google.common.base.Optional; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.sdk.coders.Coder; @@ -67,14 +66,12 @@ public class GroupCombineFunctions { /** * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} transformation. */ - public static Iterable> combineGlobally( + public static Optional>> combineGlobally( JavaRDD> rdd, final SparkGlobalCombineFn sparkCombineFn, final Coder iCoder, final Coder aCoder, final WindowingStrategy windowingStrategy) { -checkArgument(!rdd.isEmpty(), "CombineGlobally computation should be skipped for empty RDDs."); - // coders. final WindowedValue.FullWindowedValueCoder wviCoder = WindowedValue.FullWindowedValueCoder.of(iCoder, @@ -93,6 +90,11 @@ public class GroupCombineFunctions { // AccumT: A // InputT: I JavaRDD inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder)); + +if (inputRDDBytes.isEmpty()) { + return Optional.absent(); +} + /*Itr>*/ byte[] accumulatedBytes = inputRDDBytes.aggregate( CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder), new Function2() { @@ -115,7 +117,8 @@ public class GroupCombineFunctions { } } ); -return CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder); + +return Optional.of(CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b4362b0..ffb207a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -27,6 +27,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable; import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -259,9 +260,20 @@ public final class TransformTranslator { ((BoundedDataset) context.borrowDataset(transform)).getRDD(); JavaRDD> outRdd; -// handle empty input RDD, which will naturally skip the entire execution -// as Spark will not run on empty RDDs. -if (inRdd.isEmpty()) { + +Optional>> maybeAccumulated = +GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, iCoder, aCoder, +windowingStrategy); +
[GitHub] beam pull request #2328: [BEAM-1810] Avoid RDD#isEmpty in Spark runner Trans...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2328 [BEAM-1810] Avoid RDD#isEmpty in Spark runner TransformTranslator#combineGlobally Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam combine-globally-kryo-serialization-exception Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2328.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2328 commit 7bc632fbcab1d5f53f5ce52e739769ead8cc0d19 Author: Aviem Zur Date: 2017-03-25T18:49:06Z [BEAM-1810] Avoid RDD#isEmpty in Spark runner TransformTranslator#combineGlobally --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] beam git commit: This closes #2317
This closes #2317 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe236993 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe236993 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe236993 Branch: refs/heads/master Commit: fe2369933a9f1116f22916b34fa13425efdc1a52 Parents: e5f1a64 b436263 Author: Aviem Zur Authored: Sat Mar 25 12:17:18 2017 +0300 Committer: Aviem Zur Committed: Sat Mar 25 12:17:18 2017 +0300 -- .../beam/sdk/metrics/MetricNameFilter.java | 3 +- .../beam/sdk/metrics/MetricFilteringTest.java | 73 2 files changed, 74 insertions(+), 2 deletions(-) --
[1/2] beam git commit: [BEAM-1803] Fixed bug in metrics filtering.
Repository: beam Updated Branches: refs/heads/master e5f1a6479 -> fe2369933 [BEAM-1803] Fixed bug in metrics filtering. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b436263e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b436263e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b436263e Branch: refs/heads/master Commit: b436263e01c32f9e8b648b8422c094ad7c56d2a6 Parents: e5f1a64 Author: Pablo Authored: Fri Mar 24 12:53:10 2017 -0700 Committer: Aviem Zur Committed: Sat Mar 25 12:15:55 2017 +0300 -- .../beam/sdk/metrics/MetricNameFilter.java | 3 +- .../beam/sdk/metrics/MetricFilteringTest.java | 73 2 files changed, 74 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/b436263e/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java index a2c3798..489b703 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java @@ -54,7 +54,6 @@ public abstract class MetricNameFilter { public static MetricNameFilter named(Class namespace, String name) { checkNotNull(namespace, "Must specify a inNamespace"); checkNotNull(name, "Must specify a name"); -return new AutoValue_MetricNameFilter(namespace.getSimpleName(), name); +return new AutoValue_MetricNameFilter(namespace.getName(), name); } } - http://git-wip-us.apache.org/repos/asf/beam/blob/b436263e/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java index 3e6a499..dc2fa0a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java @@ -40,6 +40,79 @@ public class MetricFilteringTest { } @Test + public void testMatchCompositeStepNameFilters() { +// MetricsFilter with a Class-namespace + name filter + step filter. +// Successful match. +assertTrue(MetricFiltering.matches( +MetricsFilter.builder().addNameFilter( +MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) +.addStep("myStep").build(), +MetricKey.create( +"myBigStep/myStep", MetricName.named(MetricFilteringTest.class, "myMetricName"; + +// Unsuccessful match. +assertFalse(MetricFiltering.matches( +MetricsFilter.builder().addNameFilter( +MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) +.addStep("myOtherStep").build(), +MetricKey.create( +"myOtherStepNoMatch/myStep", +MetricName.named(MetricFilteringTest.class, "myMetricName"; + } + + @Test + public void testMatchStepNameFilters() { +// MetricsFilter with a Class-namespace + name filter + step filter. +// Successful match. +assertTrue(MetricFiltering.matches( +MetricsFilter.builder().addNameFilter( +MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) +.addStep("myStep").build(), +MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, "myMetricName"; + +// Unsuccessful match. +assertFalse(MetricFiltering.matches( +MetricsFilter.builder().addNameFilter( +MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) +.addStep("myOtherStep").build(), +MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, "myMetricName"; + } + + @Test + public void testMatchClassNamespaceFilters() { +// MetricsFilter with a Class-namespace + name filter. Without step filter. +// Successful match. +assertTrue(MetricFiltering.matches( +MetricsFilter.builder().addNameFilter( +MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")).build(), +MetricKey.create("anyStep", MetricName.named(MetricFilteringTest.class, "myMetricName"; + +// Unsuccessful match. +assertFalse(MetricFiltering.matches( +MetricsFilter.builder().addNameFilter( +MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")).build(), +MetricKey.create("anyStep", MetricName.named(MetricFiltering
[GitHub] beam pull request #2327: [BEAM-1802] Stop Spark context on terminal pipeline...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2327 [BEAM-1802] Stop Spark context on terminal pipeline state Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam stop-terminal-spark-pipelines Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2327.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2327 commit 70ba7a5a81046100d3b49a8971febcd089cc3573 Author: Aviem Zur Date: 2017-03-25T08:10:35Z [BEAM-1802] Stop Spark context on terminal pipeline state --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2313: [BEAM-1802] Call stop in SparkPipelineResult#waitUn...
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2313 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2313: [BEAM-1802] Call stop in SparkPipelineResult#waitUn...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2313 [BEAM-1802] Call stop in SparkPipelineResult#waitUntilFinish Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam finally-stop-spark-pipelines Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2313.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2313 commit a3384206a04f327f8ca2098ac0ab078486633096 Author: Aviem Zur Date: 2017-03-24T12:12:52Z [BEAM-1802] Call stop in SparkPipelineResult#waitUntilFinish --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] beam git commit: [BEAM-1074] Set default-partitioner in SourceRDD.Unbounded
Repository: beam Updated Branches: refs/heads/master 5e1be9fa7 -> 9ac1ffcea [BEAM-1074] Set default-partitioner in SourceRDD.Unbounded Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/623a5696 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/623a5696 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/623a5696 Branch: refs/heads/master Commit: 623a5696bc328a9a55bf5de67ad0070a985c96ee Parents: 5e1be9f Author: Aviem Zur Authored: Wed Mar 22 15:20:51 2017 +0200 Committer: Aviem Zur Committed: Thu Mar 23 16:18:16 2017 +0200 -- .../spark/SparkNativePipelineVisitor.java | 1 - .../beam/runners/spark/io/SourceDStream.java| 52 +++- .../apache/beam/runners/spark/io/SourceRDD.java | 19 +-- .../runners/spark/io/SparkUnboundedSource.java | 15 +++--- 4 files changed, 63 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/623a5696/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java index c2784a2..c2d38d7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -92,7 +92,6 @@ public class SparkNativePipelineVisitor extends SparkRunner.Evaluator { @Override > void doVisitTransform(TransformHierarchy.Node node) { -super.doVisitTransform(node); @SuppressWarnings("unchecked") TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/beam/blob/623a5696/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index 8a0763b..3f2c10a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -28,6 +28,7 @@ import org.apache.spark.api.java.JavaSparkContext$; import org.apache.spark.rdd.RDD; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.Time; +import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.dstream.InputDStream; import org.apache.spark.streaming.scheduler.RateController; import org.apache.spark.streaming.scheduler.RateController$; @@ -36,7 +37,6 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator$; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Tuple2; @@ -60,6 +60,9 @@ class SourceDStream private final UnboundedSource unboundedSource; private final SparkRuntimeContext runtimeContext; private final Duration boundReadDuration; + // Number of partitions for the DStream is final and remains the same throughout the entire + // lifetime of the pipeline, including when resuming from checkpoint. + private final int numPartitions; // the initial parallelism, set by Spark's backend, will be determined once when the job starts. // in case of resuming/recovering from checkpoint, the DStream will be reconstructed and this // property should not be reset. @@ -67,40 +70,55 @@ class SourceDStream // the bound on max records is optional. // in case it is set explicitly via PipelineOptions, it takes precedence // otherwise it could be activated via RateController. - private Long boundMaxRecords = null; + private final long boundMaxRecords; SourceDStream( StreamingContext ssc, UnboundedSource unboundedSource, - SparkRuntimeContext runtimeContext) { - + SparkRuntimeContext runtimeContext, + Long boundMaxRecords) { super(ssc, JavaSparkContext$.MODULE$., CheckpointMarkT>>fakeClassTag()); this.unboundedSource = unboundedSource; this.runtimeContext = runtimeContext; + SparkPipelineOptions options = runtimeContext.getPipelineOptions().as( SparkPipelineOptions.class); + this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(), options.getMinReadTimeMillis()); // set initial parallelism once. this.initialParallelism = ssc().sc().defaultParallelism(); checkArgument(this.initialParallelism > 0, "Number of partitions must be grea
[2/2] beam git commit: This closes #2288
This closes #2288 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ac1ffce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ac1ffce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ac1ffce Branch: refs/heads/master Commit: 9ac1ffceadb30956655e27d253fd84fe355d5f54 Parents: 5e1be9f 623a569 Author: Aviem Zur Authored: Thu Mar 23 16:48:04 2017 +0200 Committer: Aviem Zur Committed: Thu Mar 23 16:48:04 2017 +0200 -- .../spark/SparkNativePipelineVisitor.java | 1 - .../beam/runners/spark/io/SourceDStream.java| 52 +++- .../apache/beam/runners/spark/io/SourceRDD.java | 19 +-- .../runners/spark/io/SparkUnboundedSource.java | 15 +++--- 4 files changed, 63 insertions(+), 24 deletions(-) --
[GitHub] beam pull request #2288: [BEAM-848] A better shuffle after reading from with...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2288 [BEAM-848] A better shuffle after reading from within mapWithState. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam sourcerdd-unbounded-default-partitioner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2288.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2288 commit 0fa4d29784ef5940c2467529e5ae7fdb78c7b98b Author: Aviem Zur Date: 2017-03-22T13:20:51Z [BEAM-1074] Set default-partitioner in SourceRDD.Unbounded commit 581233f5022cabb9ef497611b643dd5413d52060 Author: Aviem Zur Date: 2017-03-22T19:11:47Z [BEAM-1075] Shuffle the input read-values to get maximum parallelism --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #190: Simplify merge process section
GitHub user aviemzur opened a pull request: https://github.com/apache/beam-site/pull/190 Simplify merge process section R: @davorbonaci You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam-site simplify-merge-process Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #190 commit cd5a9b4c832cb79c2536f2cd538b5b12f81cc1b6 Author: Zur, Aviem Date: 2017-03-19T04:16:49Z Simplify merge process section --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[3/4] beam-site git commit: Regenerate website
Regenerate website Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/c6d92268 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/c6d92268 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/c6d92268 Branch: refs/heads/asf-site Commit: c6d922685e79463fa7f41a8f9dcd86aee59923c0 Parents: b470eb5 Author: Zur, Aviem Authored: Sun Mar 19 05:55:02 2017 +0200 Committer: Zur, Aviem Committed: Sun Mar 19 05:55:02 2017 +0200 -- content/contribute/contribution-guide/index.html | 17 + 1 file changed, 13 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam-site/blob/c6d92268/content/contribute/contribution-guide/index.html -- diff --git a/content/contribute/contribution-guide/index.html b/content/contribute/contribution-guide/index.html index 4d4a782..af492a9 100644 --- a/content/contribute/contribution-guide/index.html +++ b/content/contribute/contribution-guide/index.html @@ -214,7 +214,7 @@ Website One-time Setup Working on your change - Committing website changes + Committing website changes (committers only) @@ -736,7 +736,7 @@ $ git checkout -borigin/asf-site While you are working on your pull request, you can test and develop live by running the following command in the root folder of the website: -$ bundle exec jekyll serve +$ bundle exec jekyll serve --incremental @@ -758,11 +758,20 @@ $ git checkout -b origin/asf-site During review, committers will patch in your PR, generate the static content/, and review the changes. -Committing website changes +Committing website changes (committers only) Follow the same committer process as above, but using repository apache/beam-site and branch asf-site. -In addition, the committer is responsible for doing the final jekyll build to generate the static content, so follow the instructions above to install jekyll. +In addition, the committer is responsible for doing the final bundle exec jekyll build to generate the static content, so follow the instructions above to install jekyll. + +This command generates the content/ directory. The committer should add and commit the content related to the PR. + +$ git add content/ +$ git commit -m "Regenerate website" + + + +Finally you should merge the changes into the asf-site branch and push them into the apache repository.
[1/4] beam-site git commit: Improve committing website changes section in the contribution guide
Repository: beam-site Updated Branches: refs/heads/asf-site f9b4c6e7a -> 41018ed60 Improve committing website changes section in the contribution guide Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/653218f8 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/653218f8 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/653218f8 Branch: refs/heads/asf-site Commit: 653218f89c0a6b50283bf502538f8b131aa54ea1 Parents: f9b4c6e Author: Ismaël MejÃa Authored: Sun Mar 19 01:06:26 2017 +0100 Committer: Ismaël MejÃa Committed: Sun Mar 19 01:08:50 2017 +0100 -- src/contribute/contribution-guide.md | 13 ++--- 1 file changed, 10 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam-site/blob/653218f8/src/contribute/contribution-guide.md -- diff --git a/src/contribute/contribution-guide.md b/src/contribute/contribution-guide.md index b2af352..c1617af 100644 --- a/src/contribute/contribution-guide.md +++ b/src/contribute/contribution-guide.md @@ -442,7 +442,7 @@ The general guidelines for cloning a repository can be adjusted to use the `asf- While you are working on your pull request, you can test and develop live by running the following command in the root folder of the website: - $ bundle exec jekyll serve + $ bundle exec jekyll serve --incremental Jekyll will start a webserver on port 4000. As you make changes to the content, Jekyll will rebuild it automatically. @@ -458,8 +458,15 @@ When you are ready, submit a pull request using the [Beam Site GitHub mirror](ht During review, committers will patch in your PR, generate the static `content/`, and review the changes. - Committing website changes + Committing website changes (committers only) Follow the same committer process as above, but using repository `apache/beam-site` and branch `asf-site`. -In addition, the committer is responsible for doing the final `jekyll build` to generate the static content, so follow the instructions above to install `jekyll`. +In addition, the committer is responsible for doing the final `bundle exec jekyll build` to generate the static content, so follow the instructions above to install `jekyll`. + +This command generates the `content/` directory. The committer should add and commit the content related to the PR. + + $ git add content/ + $ git commit -m "Regenerate website" + +Finally you should merge the changes into the `asf-site` branch and push them into the `apache` repository.
[2/4] beam-site git commit: Fix timezone to America/Los_Angeles
Fix timezone to America/Los_Angeles Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/b470eb5b Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/b470eb5b Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/b470eb5b Branch: refs/heads/asf-site Commit: b470eb5b418949f19e862a5faeac06e0a4dc8bfb Parents: 653218f Author: Ismaël MejÃa Authored: Sun Mar 19 01:09:00 2017 +0100 Committer: Ismaël MejÃa Committed: Sun Mar 19 01:12:03 2017 +0100 -- _config.yml | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam-site/blob/b470eb5b/_config.yml -- diff --git a/_config.yml b/_config.yml index 7d36162..2e9084c 100644 --- a/_config.yml +++ b/_config.yml @@ -52,3 +52,6 @@ kramdown: release_latest: 0.6.0 # Plugins are configured in the Gemfile. + +# Set the time zone for site generation, fixed to US Pacific Time +timezone: America/Los_Angeles
[4/4] beam-site git commit: This closes #189
This closes #189 Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/41018ed6 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/41018ed6 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/41018ed6 Branch: refs/heads/asf-site Commit: 41018ed6016c18fc0dac1a2740e510a9a2460bca Parents: f9b4c6e c6d9226 Author: Zur, Aviem Authored: Sun Mar 19 05:58:40 2017 +0200 Committer: Zur, Aviem Committed: Sun Mar 19 05:58:40 2017 +0200 -- _config.yml | 3 +++ content/contribute/contribution-guide/index.html | 17 + src/contribute/contribution-guide.md | 13 ++--- 3 files changed, 26 insertions(+), 7 deletions(-) --
[GitHub] beam-site pull request #187: Add aviemzur as committer
GitHub user aviemzur opened a pull request: https://github.com/apache/beam-site/pull/187 Add aviemzur as committer You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam-site aviemzur-committer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/187.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #187 commit 41b8245b94d5b02512c94b53a49cb7910edfbc1f Author: Zur, Aviem Date: 2017-03-18T04:09:19Z Add aviemzur as committer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2263: [BEAM-1726] TestFlinkRunner should assert PAssert s...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2263 [BEAM-1726] TestFlinkRunner should assert PAssert success/failure counters Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam flink-test-runner-aggregators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2263.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2263 commit 354bbcc2d87164cd5bb94b7c71d50df1d4c17c93 Author: Aviem Zur Date: 2017-03-17T11:36:55Z [BEAM-1726] TestFlinkRunner should assert PAssert success/failure counters --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2261: Remove duplicate build-helper-maven-plugin
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2261 Remove duplicate build-helper-maven-plugin Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam duplicate-helper-plugin-definition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2261.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2261 commit eb964e6fe973a2b2dc627f33598e0f9867c889e6 Author: Aviem Zur Date: 2017-03-16T21:46:23Z Remove duplicate build-helper-maven-plugin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2259: Fix shading opt out in io/google-cloud-platform
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2259 Fix shading opt out in io/google-cloud-platform Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam fix-google-io-shading-opt-out Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2259.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2259 commit dd1b001728375aeecfc577de7ea13761aa37322c Author: Aviem Zur Date: 2017-03-16T08:14:36Z Fix shading opt out in io/google-cloud-platform --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2249: Increment shade-plugin version back to 3.0.0
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2249 Increment shade-plugin version back to 3.0.0 Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam increment-shade-plugin-version Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2249 commit d5e44e3517d302408fb064303f09d9668252afaa Author: Aviem Zur Date: 2017-03-15T04:29:09Z Increment shade-plugin version back to 3.0.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #178: Add IntelliJ files to .gitignore
GitHub user aviemzur opened a pull request: https://github.com/apache/beam-site/pull/178 Add IntelliJ files to .gitignore You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam-site add-intellij-files-to-gitignore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #178 commit 1d2e719ee777dce2c88eaf1893222bf935722662 Author: Zur, Aviem Date: 2017-03-14T08:07:25Z Add IntelliJ files to .gitignore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #177: [BEAM-1652] Code style instructions in contribu...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam-site/pull/177 [BEAM-1652] Code style instructions in contribution guide You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam-site code-style-insturctions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/177.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #177 commit 2b4f3b239adc2cb272ce2ee7fae1b103b7ecfaaf Author: Zur, Aviem Date: 2017-03-14T05:43:29Z [BEAM-1652] Code style instructions for IntelliJ in contribution guide. commit 82628b4d77b87295ed4c72444211e77573608046 Author: Zur, Aviem Date: 2017-03-14T06:25:34Z [BEAM-1652] Code style instructions for Eclipse in contribution guide. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2234: [BEAM-1704] Added Create.TimestampedValues.withType
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2234 [BEAM-1704] Added Create.TimestampedValues.withType Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam timestampedvalues-with-type Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2234 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2195: [BEAM-1651] Add IntelliJ code style xml to the proj...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2195 [BEAM-1651] Add IntelliJ code style xml to the project repository Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam intellij_codestyle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2195 commit 18ee561dde7b2f77095592a07c1ebcc6c4fd2d56 Author: Aviem Zur Date: 2017-03-08T11:59:03Z [BEAM-1651] Add IntelliJ code style xml to the project repository --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2180: [BEAM-1636] UnboundedDataset action() does not mate...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2180 [BEAM-1636] UnboundedDataset action() does not materialize RDD Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam unbounded-dataset-materialize-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2180.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2180 commit a889597e748eb752141af8dc568c56449c4eba5c Author: Aviem Zur Date: 2017-03-07T13:07:03Z [BEAM-1636] UnboundedDataset action() does not materialize RDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2171: [BEAM-1629] Make metrics/aggregators accumulators a...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2171 [BEAM-1629] Make metrics/aggregators accumulators available on driver Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam fix-accumulator-singleton-instantiation-race Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2171.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2171 commit 5c4cb5ee153fd7ca069d90d3ac558bceaf82678c Author: Aviem Zur Date: 2017-03-06T18:48:48Z [BEAM-1629] Make metrics/aggregators accumulators available on driver --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2162: [BEAM-1397] Introduce IO metrics
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2162 [BEAM-1397] Introduce IO metrics Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam introduce-io-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2162.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2162 commit 62c699d84d37ddbfbd40fe4e970890b8cdb9f0a9 Author: Aviem Zur Date: 2017-03-05T19:37:05Z [BEAM-1397] Introduce IO metrics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2082: [BEAM-1397] [BEAM-1398] Introduce IO metrics. Add K...
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2082 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2082: [BEAM-1397] [BEAM-1398] Introduce IO metrics. Add K...
Github user aviemzur closed the pull request at: https://github.com/apache/beam/pull/2082 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2082: [BEAM-1397] [BEAM-1398] Introduce IO metrics. Add K...
GitHub user aviemzur reopened a pull request: https://github.com/apache/beam/pull/2082 [BEAM-1397] [BEAM-1398] Introduce IO metrics. Add KafkaIO metrics. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam io-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2082.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2082 commit 31d94d4eecf492357c41424f65e1037fc3976d09 Author: Aviem Zur Date: 2017-02-22T14:18:13Z [BEAM-1397] Introduce IO metrics commit 8192724e65e65092117fee0a78408b476adf0245 Author: Aviem Zur Date: 2017-02-22T21:26:45Z [BEAM-1398] KafkaIO metrics commit 62d0ac450ff4631ddfd057a5caa785dae305065b Author: Aviem Zur Date: 2017-02-23T04:56:43Z Test Spark runner streaming IO metrics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2161: [BEAM-1625] BoundedDataset action() does not materi...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2161 [BEAM-1625] BoundedDataset action() does not materialize RDD Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam spark-materialize-bug Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2161.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2161 commit f0cb2b77545680b44a6e424cea7f9738dafddbb2 Author: Aviem Zur Date: 2017-03-05T14:01:44Z [BEAM-1625] BoundedDataset action() does not materialize RDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #2160: [BEAM-1623] Transform Reshuffle directly in Spark r...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam/pull/2160 [BEAM-1623] Transform Reshuffle directly in Spark runner Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam translate-reshuffle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2160.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2160 commit 6bc0241bb8d87f25f40ebef2682aab31b04a9fe9 Author: Aviem Zur Date: 2017-03-05T05:15:32Z [BEAM-1623] Transform Reshuffle directly in Spark runner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam-site pull request #170: Fix possible erranous naming conventions refere...
GitHub user aviemzur opened a pull request: https://github.com/apache/beam-site/pull/170 Fix possible erranous naming conventions reference You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/beam-site naming-conventions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/170.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #170 commit 110d4508c34654353936c05dc46c612c68428c3d Author: Zur, Aviem Date: 2017-03-04T12:01:45Z Fix possible erranous naming conventions reference --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---