[2/2] beam git commit: This closes #3840
This closes #3840 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50532f0a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50532f0a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50532f0a Branch: refs/heads/master Commit: 50532f0a92d7ce8dbbdc6c3179ab7b9efde6a746 Parents: 8d71ebf c3d4c5d Author: Stas LevinAuthored: Wed Sep 13 11:04:20 2017 +0300 Committer: Stas Levin Committed: Wed Sep 13 11:04:20 2017 +0300 -- .../SparkGroupAlsoByWindowViaWindowSet.java | 82 +--- .../spark/stateful/SparkTimerInternals.java | 15 2 files changed, 56 insertions(+), 41 deletions(-) --
[1/2] beam git commit: [BEAM-2859] Fixed processing timers not being properly fired when watermark stays put by tweaking the way spark-runner was delivering timers to reduceFnRunner in SparkGroupAlsoB
Repository: beam Updated Branches: refs/heads/master 8d71ebf82 -> 50532f0a9 [BEAM-2859] Fixed processing timers not being properly fired when watermark stays put by tweaking the way spark-runner was delivering timers to reduceFnRunner in SparkGroupAlsoByWindowViaWindowSet Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3d4c5d9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3d4c5d9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3d4c5d9 Branch: refs/heads/master Commit: c3d4c5d98cc115dce7e03e64cd29713562ff62b3 Parents: 8d71ebf Author: Stas LevinAuthored: Tue Sep 12 10:34:45 2017 +0300 Committer: Stas Levin Committed: Wed Sep 13 11:04:08 2017 +0300 -- .../SparkGroupAlsoByWindowViaWindowSet.java | 82 +--- .../spark/stateful/SparkTimerInternals.java | 15 2 files changed, 56 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 2258f05..1fb8700 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -18,7 +18,9 @@ package org.apache.beam.runners.spark.stateful; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import com.google.common.collect.Table; import java.io.Serializable; @@ -51,6 +53,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -204,6 +207,32 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { this.droppedDueToLateness = droppedDueToLateness; } + /** + * Retrieves the timers that are eligible for processing by {@link + * org.apache.beam.runners.core.ReduceFnRunner}. + * + * @return A collection of timers that are eligible for processing. For a {@link + * TimeDomain#EVENT_TIME} timer, this implies that the watermark has passed the timer's + * timestamp. For other TimeDomains (e.g., {@link + * TimeDomain#PROCESSING_TIME}), a timer is always considered eligible for processing (no + * restrictions). + */ + private Collection filterTimersEligibleForProcessing( + final Collection timers, final Instant inputWatermark) { +final Predicate eligibleForProcessing = +new Predicate() { + + @Override + public boolean apply(final TimerInternals.TimerData timer) { +return !timer.getDomain().equals(TimeDomain.EVENT_TIME) +|| inputWatermark.isAfter(timer.getTimestamp()); + } +}; + +return FluentIterable.from(timers).filter(eligibleForProcessing).toSet(); + } + + @Override protected Tuple2>>*/ List >> computeNext() { @@ -268,16 +297,14 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { LOG.trace(logPrefix + ": input elements: {}", elements); - /* - Incoming expired windows are filtered based on - timerInternals.currentInputWatermarkTime() and the configured allowed - lateness. Note that this is done prior to calling - timerInternals.advanceWatermark so essentially the inputWatermark is - the highWatermark of the previous batch and the lowWatermark of the - current batch. - The highWatermark of the current batch will only affect filtering - as of the next batch. - */ + // Incoming expired windows are filtered based on + // timerInternals.currentInputWatermarkTime() and the configured allowed + // lateness. Note that this is done prior to calling +
[1/2] beam git commit: [BEAM-2825] Refactored SparkGroupAlsoByWindowViaWindowSet to improve readability.
Repository: beam Updated Branches: refs/heads/master a481d5611 -> f7d4583bd [BEAM-2825] Refactored SparkGroupAlsoByWindowViaWindowSet to improve readability. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8b99ba3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8b99ba3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8b99ba3 Branch: refs/heads/master Commit: c8b99ba393c54da1a3ffbc61c2e5f2ae92b0b2bb Parents: a481d56 Author: Stas LevinAuthored: Wed Aug 30 12:01:32 2017 +0300 Committer: Stas Levin Committed: Sun Sep 3 15:40:25 2017 +0300 -- .../SparkGroupAlsoByWindowViaWindowSet.java | 878 +++ 1 file changed, 498 insertions(+), 380 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/c8b99ba3/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index e6a55a6..2258f05 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -58,12 +58,12 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext$; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; @@ -73,435 +73,553 @@ import org.apache.spark.streaming.dstream.PairDStreamFunctions; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function1; import scala.Option; import scala.Tuple2; import scala.Tuple3; +import scala.collection.Iterator; import scala.collection.Seq; -import scala.reflect.ClassTag; import scala.runtime.AbstractFunction1; /** - * An implementation of {@link GroupAlsoByWindow} - * logic for grouping by windows and controlling trigger firings and pane accumulation. + * An implementation of {@link GroupAlsoByWindow} logic for grouping by windows and controlling + * trigger firings and pane accumulation. * * This implementation is a composite of Spark transformations revolving around state management - * using Spark's - * {@link PairDStreamFunctions#updateStateByKey(Function1, Partitioner, boolean, ClassTag)} - * to update state with new data and timers. + * using Spark's {@link PairDStreamFunctions#updateStateByKey(scala.Function1, + * org.apache.spark.Partitioner, boolean, scala.reflect.ClassTag)} to update state with new data and + * timers. * - * Using updateStateByKey allows to scan through the entire state visiting not just the - * updated state (new values for key) but also check if timers are ready to fire. - * Since updateStateByKey bounds the types of state and output to be the same, - * a (state, output) tuple is used, filtering the state (and output if no firing) - * in the following steps. + * Using updateStateByKey allows to scan through the entire state visiting not just the updated + * state (new values for key) but also check if timers are ready to fire. Since updateStateByKey + * bounds the types of state and output to be the same, a (state, output) tuple is used, filtering + * the state (and output if no firing) in the following steps. */ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger( - SparkGroupAlsoByWindowViaWindowSet.class); - - /** - * A helper class that is essentially a {@link Serializable} {@link AbstractFunction1}. - */ - private abstract static class SerializableFunction1 - extends AbstractFunction1 implements Serializable { - } + private static final Logger LOG = + LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class); - public static - JavaDStream >> groupAlsoByWindow( - final
[2/2] beam git commit: This closes #3793
This closes #3793 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7d4583b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7d4583b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7d4583b Branch: refs/heads/master Commit: f7d4583bdb3ef36734125d8aa7670ad924864f4c Parents: a481d56 c8b99ba Author: Stas LevinAuthored: Sun Sep 3 15:40:35 2017 +0300 Committer: Stas Levin Committed: Sun Sep 3 15:40:35 2017 +0300 -- .../SparkGroupAlsoByWindowViaWindowSet.java | 878 +++ 1 file changed, 498 insertions(+), 380 deletions(-) --
[2/2] beam git commit: This closes #3749
This closes #3749 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a481d561 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a481d561 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a481d561 Branch: refs/heads/master Commit: a481d5611925f29224724bf0e79365db8f905853 Parents: 80aebd9 ffd08da Author: Stas LevinAuthored: Sun Sep 3 09:04:29 2017 +0300 Committer: Stas Levin Committed: Sun Sep 3 09:04:29 2017 +0300 -- .../SparkGroupAlsoByWindowViaWindowSet.java | 15 ++-- .../spark/translation/BoundedDataset.java | 17 - .../beam/runners/spark/translation/Dataset.java | 3 +- .../spark/translation/EvaluationContext.java| 23 -- .../spark/translation/SparkContextFactory.java | 2 - .../translation/StorageLevelPTransform.java | 37 -- .../spark/translation/TransformTranslator.java | 53 + .../spark/translation/TranslationUtils.java | 78 .../streaming/StreamingTransformTranslator.java | 15 +++- .../translation/streaming/UnboundedDataset.java | 27 +-- .../spark/translation/StorageLevelTest.java | 75 --- 11 files changed, 166 insertions(+), 179 deletions(-) --
[1/2] beam git commit: [BEAM-2669] Fixed Kryo serialization exception when dstream is cached (by using coders and moving to bytes before attempting to serialise an RDD as part of caching it).
Repository: beam Updated Branches: refs/heads/master 80aebd902 -> a481d5611 [BEAM-2669] Fixed Kryo serialization exception when dstream is cached (by using coders and moving to bytes before attempting to serialise an RDD as part of caching it). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffd08dae Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffd08dae Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffd08dae Branch: refs/heads/master Commit: ffd08dae0d1a6fcde438ae4e9c2a348eb2a5d493 Parents: 80aebd9 Author: ksalantAuthored: Wed Aug 23 14:54:46 2017 +0300 Committer: Stas Levin Committed: Sun Sep 3 09:03:28 2017 +0300 -- .../SparkGroupAlsoByWindowViaWindowSet.java | 15 ++-- .../spark/translation/BoundedDataset.java | 17 - .../beam/runners/spark/translation/Dataset.java | 3 +- .../spark/translation/EvaluationContext.java| 23 -- .../spark/translation/SparkContextFactory.java | 2 - .../translation/StorageLevelPTransform.java | 37 -- .../spark/translation/TransformTranslator.java | 53 + .../spark/translation/TranslationUtils.java | 78 .../streaming/StreamingTransformTranslator.java | 15 +++- .../translation/streaming/UnboundedDataset.java | 27 +-- .../spark/translation/StorageLevelTest.java | 75 --- 11 files changed, 166 insertions(+), 179 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/ffd08dae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 52f7376..e6a55a6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -96,7 +96,7 @@ import scala.runtime.AbstractFunction1; * a (state, output) tuple is used, filtering the state (and output if no firing) * in the following steps. */ -public class SparkGroupAlsoByWindowViaWindowSet { +public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { private static final Logger LOG = LoggerFactory.getLogger( SparkGroupAlsoByWindowViaWindowSet.class); @@ -226,8 +226,6 @@ public class SparkGroupAlsoByWindowViaWindowSet { final SystemReduceFn reduceFn = SystemReduceFn.buffering( ((FullWindowedValueCoder) wvCoder).getValueCoder()); -final OutputWindowedValueHolder outputHolder = -new OutputWindowedValueHolder<>(); // use in memory Aggregators since Spark Accumulators are not resilient // in stateful operators, once done with this partition. final MetricsContainerImpl cellProvider = new MetricsContainerImpl("cellProvider"); @@ -280,6 +278,9 @@ public class SparkGroupAlsoByWindowViaWindowSet { SparkTimerInternals.deserializeTimers(serTimers, timerDataCoder)); } + final OutputWindowedValueHolder outputHolder = + new OutputWindowedValueHolder<>(); + ReduceFnRunner reduceFnRunner = new ReduceFnRunner<>( key, @@ -294,8 +295,6 @@ public class SparkGroupAlsoByWindowViaWindowSet { reduceFn, options.get()); - outputHolder.clear(); // clear before potential use. - if (!seq.isEmpty()) { // new input for key. try { @@ -457,7 +456,7 @@ public class SparkGroupAlsoByWindowViaWindowSet { }); } - private static class StateAndTimers { + private static class StateAndTimers implements Serializable { //Serializable state for internals (namespace to state tag to coded value). private final Table state; private final Collection serTimers; @@ -494,10 +493,6 @@ public class SparkGroupAlsoByWindowViaWindowSet { return windowedValues; } -private void clear() { - windowedValues.clear(); -} - @Override public void outputWindowedValue( TupleTag tag,
[2/2] beam git commit: This closes #3738
This closes #3738 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5181e619 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5181e619 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5181e619 Branch: refs/heads/master Commit: 5181e619f17e1f69fabe8d5bdfc7a3a6a2142cde Parents: c4517d0 15472b2 Author: Stas LevinAuthored: Thu Aug 24 09:43:16 2017 +0300 Committer: Stas Levin Committed: Thu Aug 24 09:43:16 2017 +0300 -- .../apache/beam/runners/spark/SparkRunner.java | 5 +- .../beam/runners/spark/io/CreateStream.java | 104 --- .../SparkGroupAlsoByWindowViaWindowSet.java | 158 +++--- .../spark/stateful/SparkTimerInternals.java | 6 + .../streaming/StreamingTransformTranslator.java | 71 +++-- .../streaming/WatermarkSyncedDStream.java | 149 + .../spark/util/GlobalWatermarkHolder.java | 302 +-- .../runners/spark/SparkPipelineStateTest.java | 4 +- .../translation/streaming/CreateStreamTest.java | 33 +- .../spark/src/test/resources/log4j.properties | 11 +- 10 files changed, 633 insertions(+), 210 deletions(-) --
[1/2] beam git commit: [BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner.
Repository: beam Updated Branches: refs/heads/master c4517d04c -> 5181e619f [BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15472b28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15472b28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15472b28 Branch: refs/heads/master Commit: 15472b28c649381b90a0405d80012aa8523d13c5 Parents: c4517d0 Author: Stas LevinAuthored: Sun Aug 20 16:48:57 2017 +0300 Committer: Stas Levin Committed: Thu Aug 24 09:42:12 2017 +0300 -- .../apache/beam/runners/spark/SparkRunner.java | 5 +- .../beam/runners/spark/io/CreateStream.java | 104 --- .../SparkGroupAlsoByWindowViaWindowSet.java | 158 +++--- .../spark/stateful/SparkTimerInternals.java | 6 + .../streaming/StreamingTransformTranslator.java | 71 +++-- .../streaming/WatermarkSyncedDStream.java | 149 + .../spark/util/GlobalWatermarkHolder.java | 302 +-- .../runners/spark/SparkPipelineStateTest.java | 4 +- .../translation/streaming/CreateStreamTest.java | 33 +- .../spark/src/test/resources/log4j.properties | 11 +- 10 files changed, 633 insertions(+), 210 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 595521f..98ca1be 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; -import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.io.Read; @@ -171,7 +171,8 @@ public final class SparkRunner extends PipelineRunner { } // register Watermarks listener to broadcast the advanced WMs. - jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener())); + jssc.addStreamingListener( + new JavaStreamingListenerWrapper(new WatermarkAdvancingStreamingListener())); // The reason we call initAccumulators here even though it is called in // SparkRunnerStreamingContextFactory is because the factory is not called when resuming http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index d485d25..4c73d95 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -41,34 +41,34 @@ import org.joda.time.Instant; /** * Create an input stream from Queue. For SparkRunner tests only. * - * To properly compose a stream of micro-batches with their Watermarks, please keep in mind - * that eventually there a two queues here - one for batches and another for Watermarks. + * To properly compose a stream of micro-batches with their Watermarks, please keep in mind that + * eventually there a two queues here - one for batches and another for Watermarks. * - * While both queues advance according to Spark's batch-interval, there is a slight difference - * in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks + * While both queues advance according to Spark's batch-interval, there is a slight difference in + * how data is pushed into the stream compared to the advancement of Watermarks since Watermarks * advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific - * batch it should be called before that batch. - * Also keep in mind that being a
[2/2] beam git commit: This closes #2854
This closes #2854 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e10fbdaa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e10fbdaa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e10fbdaa Branch: refs/heads/master Commit: e10fbdaa2db480efb45074d59341ab5256a42d80 Parents: d96fd17 81b89ac Author: Stas LevinAuthored: Tue May 9 10:09:42 2017 +0300 Committer: Stas Levin Committed: Tue May 9 10:09:42 2017 +0300 -- .../apache/beam/runners/spark/io/SourceRDD.java | 173 --- .../spark/io/ReaderToIteratorAdapterTest.java | 145 2 files changed, 260 insertions(+), 58 deletions(-) --
[2/2] beam git commit: This closes #2733
This closes #2733 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/535761a7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/535761a7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/535761a7 Branch: refs/heads/master Commit: 535761a7451c3e1dc557a58c35f2190e05c68aac Parents: 254470e fcb61ae Author: Stas LevinAuthored: Mon May 1 15:32:19 2017 +0300 Committer: Stas Levin Committed: Mon May 1 15:32:19 2017 +0300 -- .../beam/runners/spark/io/SourceDStream.java| 61 +--- 1 file changed, 40 insertions(+), 21 deletions(-) --
[1/2] beam git commit: [BEAM-2074, BEAM-2073] Fixed SourceDStream's rate control usage.
Repository: beam Updated Branches: refs/heads/master 254470e62 -> 535761a74 [BEAM-2074,BEAM-2073] Fixed SourceDStream's rate control usage. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fcb61ae6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fcb61ae6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fcb61ae6 Branch: refs/heads/master Commit: fcb61ae603ba61ed94bbbe75f4d5c8257eaa1c32 Parents: 254470e Author: Stas LevinAuthored: Wed Apr 26 11:46:31 2017 +0300 Committer: Stas Levin Committed: Mon May 1 14:13:43 2017 +0300 -- .../beam/runners/spark/io/SourceDStream.java| 61 +--- 1 file changed, 40 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/fcb61ae6/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index d8f414a..20aca5f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -99,7 +99,7 @@ class SourceDStream this.initialParallelism = ssc().sparkContext().defaultParallelism(); checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero."); -this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : rateControlledMaxRecords(); +this.boundMaxRecords = boundMaxRecords; try { this.numPartitions = @@ -124,14 +124,34 @@ class SourceDStream private MicrobatchSource createMicrobatchSource() { -return new MicrobatchSource<>( -unboundedSource, -boundReadDuration, -initialParallelism, -boundMaxRecords, --1, -id(), -readerCacheInterval); +return new MicrobatchSource<>(unboundedSource, + boundReadDuration, + initialParallelism, + computeReadMaxRecords(), + -1, + id(), + readerCacheInterval); + } + + private long computeReadMaxRecords() { +if (boundMaxRecords > 0) { + LOG.info("Max records per batch has been set to {}, as configured in the PipelineOptions.", + boundMaxRecords); + return boundMaxRecords; +} else { + final scala.Option rateControlledMax = rateControlledMaxRecords(); + if (rateControlledMax.isDefined()) { +LOG.info("Max records per batch has been set to {}, as advised by the rate controller.", + rateControlledMax.get()); +return rateControlledMax.get(); + } else { +LOG.info("Max records per batch has not been limited by neither configuration " + + "nor the rate controller, and will remain unlimited for the current batch " + + "({}).", + Long.MAX_VALUE); +return Long.MAX_VALUE; + } +} } @Override @@ -170,19 +190,18 @@ class SourceDStream // Bound by records. - private long rateControlledMaxRecords() { -scala.Option rateControllerOption = rateController(); -if (rateControllerOption.isDefined()) { - long rateLimitPerSecond = rateControllerOption.get().getLatestRate(); - if (rateLimitPerSecond > 0) { -long totalRateLimit = -rateLimitPerSecond * (ssc().graph().batchDuration().milliseconds() / 1000); -LOG.info("RateController set limit to {}", totalRateLimit); -return totalRateLimit; - } + private scala.Option rateControlledMaxRecords() { +final scala.Option rateControllerOption = rateController(); +final scala.Option rateLimitPerBatch; +final long rateLimitPerSec; +if (rateControllerOption.isDefined() +&& ((rateLimitPerSec = rateControllerOption.get().getLatestRate()) > 0)) { + final long batchDurationSec = ssc().graph().batchDuration().milliseconds() / 1000; + rateLimitPerBatch = scala.Option.apply(rateLimitPerSec * batchDurationSec); +} else { + rateLimitPerBatch = scala.Option.empty(); } -LOG.info("RateController had nothing to report, default is Long.MAX_VALUE"); -return Long.MAX_VALUE; +return rateLimitPerBatch; } private final
[2/2] beam git commit: This closes #2698
This closes #2698 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/254470e6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/254470e6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/254470e6 Branch: refs/heads/master Commit: 254470e626edb6a013ba5cb2d3312dc3bfbdcb51 Parents: b414f8d 3b6f4f6 Author: Stas LevinAuthored: Mon May 1 14:07:20 2017 +0300 Committer: Stas Levin Committed: Mon May 1 14:07:20 2017 +0300 -- .../beam/runners/spark/io/MicrobatchSource.java | 188 +-- .../beam/runners/spark/io/SourceDStream.java| 12 +- .../apache/beam/runners/spark/io/SourceRDD.java | 11 +- .../spark/stateful/StateSpecFunctions.java | 30 +-- 4 files changed, 125 insertions(+), 116 deletions(-) --
[2/2] beam git commit: This closes #2403
This closes #2403 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/646cbdb7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/646cbdb7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/646cbdb7 Branch: refs/heads/master Commit: 646cbdb7087fc68b7c2ff175ffbebac5f58688fd Parents: 6df661b d96a95c Author: Stas LevinAuthored: Wed Apr 5 10:26:16 2017 +0300 Committer: Stas Levin Committed: Wed Apr 5 10:26:16 2017 +0300 -- .../apache/beam/sdk/testing/TestPipeline.java | 76 +--- .../org/apache/beam/sdk/io/TFRecordIOTest.java | 7 +- 2 files changed, 57 insertions(+), 26 deletions(-) --
[1/2] beam git commit: [BEAM-1777] In certain circumstances PipelineEnforcement shades pipeline's exceptions.
Repository: beam Updated Branches: refs/heads/master 6df661b0e -> 646cbdb70 [BEAM-1777] In certain circumstances PipelineEnforcement shades pipeline's exceptions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d96a95c5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d96a95c5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d96a95c5 Branch: refs/heads/master Commit: d96a95c5ed561721dcc4cde16bbd3a3308e6f18e Parents: 6df661b Author: Stas LevinAuthored: Sun Apr 2 14:09:43 2017 +0300 Committer: Stas Levin Committed: Wed Apr 5 10:26:08 2017 +0300 -- .../apache/beam/sdk/testing/TestPipeline.java | 76 +--- .../org/apache/beam/sdk/io/TFRecordIOTest.java | 7 +- 2 files changed, 57 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/d96a95c5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 6a8335e..a4ab196 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -106,7 +106,7 @@ public class TestPipeline extends Pipeline implements TestRule { protected final Pipeline pipeline; -private boolean runInvoked; +protected boolean runAttempted; private PipelineRunEnforcement(final Pipeline pipeline) { this.pipeline = pipeline; @@ -116,12 +116,14 @@ public class TestPipeline extends Pipeline implements TestRule { enableAutoRunIfMissing = enable; } -protected void afterPipelineExecution() { - runInvoked = true; +protected void beforePipelineExecution() { + runAttempted = true; } -protected void afterTestCompletion() { - if (!runInvoked && enableAutoRunIfMissing) { +protected void afterPipelineExecution() {} + +protected void afterUserCodeFinished() { + if (!runAttempted && enableAutoRunIfMissing) { pipeline.run().waitUntilFinish(); } } @@ -174,27 +176,38 @@ public class TestPipeline extends Pipeline implements TestRule { } private void verifyPipelineExecution() { - final List pipelineNodes = recordPipelineNodes(pipeline); - if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) { -final boolean hasDanglingPAssert = -FluentIterable.from(pipelineNodes) -.filter(Predicates.not(Predicates.in(runVisitedNodes))) -.anyMatch(isPAssertNode); -if (hasDanglingPAssert) { - throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); -} else { - throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); -} - } else if (runVisitedNodes == null && !enableAutoRunIfMissing) { -if (!isEmptyPipeline(pipeline)) { + if (!isEmptyPipeline(pipeline)) { +if (!runAttempted && !enableAutoRunIfMissing) { throw new PipelineRunMissingException( "The pipeline has not been run (runner: " + pipeline.getOptions().getRunner().getSimpleName() + ")"); + +} else { + final List pipelineNodes = recordPipelineNodes(pipeline); + if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) { +final boolean hasDanglingPAssert = +FluentIterable.from(pipelineNodes) + .filter(Predicates.not(Predicates.in(runVisitedNodes))) + .anyMatch(isPAssertNode); +if (hasDanglingPAssert) { + throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); +} else { + throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); +} + } } } } +private boolean visitedAll(final List pipelineNodes) { + return runVisitedNodes.equals(pipelineNodes); +} + +private boolean pipelineRunSucceeded() { + return runVisitedNodes != null; +} + @Override protected void afterPipelineExecution() { runVisitedNodes = recordPipelineNodes(pipeline); @@ -202,8 +215,8 @@ public class TestPipeline extends Pipeline implements TestRule { } @Override -protected void afterTestCompletion() { - super.afterTestCompletion(); +protected void afterUserCodeFinished() { + super.afterUserCodeFinished();
[2/2] beam git commit: This closes #2073
This closes #2073 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe1d4124 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe1d4124 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe1d4124 Branch: refs/heads/master Commit: fe1d4124e7d51ec45869fb9e95cc8243f5891553 Parents: ea33e33 38e00f3 Author: Stas LevinAuthored: Sun Apr 2 13:07:28 2017 +0300 Committer: Stas Levin Committed: Sun Apr 2 13:07:28 2017 +0300 -- .../runners/spark/io/SparkUnboundedSource.java | 36 ++-- .../spark/stateful/StateSpecFunctions.java | 21 +--- 2 files changed, 50 insertions(+), 7 deletions(-) --
[1/2] beam git commit: [BEAM-1048] Added a read duration metric to SparkUnboundedSource.
Repository: beam Updated Branches: refs/heads/master ea33e3373 -> fe1d4124e [BEAM-1048] Added a read duration metric to SparkUnboundedSource. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38e00f3f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38e00f3f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38e00f3f Branch: refs/heads/master Commit: 38e00f3f01e0125f6ac929da0a9d1c93fcc6787d Parents: ea33e33 Author: Stas LevinAuthored: Sun Apr 2 08:46:14 2017 +0300 Committer: Stas Levin Committed: Sun Apr 2 13:07:02 2017 +0300 -- .../runners/spark/io/SparkUnboundedSource.java | 36 ++-- .../spark/stateful/StateSpecFunctions.java | 21 +--- 2 files changed, 50 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/38e00f3f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 162bca4..6b34590 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.spark.io; +import java.io.Closeable; +import java.io.IOException; import java.io.Serializable; import java.util.Collections; import org.apache.beam.runners.spark.SparkPipelineOptions; @@ -32,6 +34,10 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -105,7 +111,8 @@ public class SparkUnboundedSource { JavaDStream metadataDStream = mapWithStateDStream.map(new Tuple2MetadataFunction()); // register ReadReportDStream to report information related to this read. -new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register(); +new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id), stepName) +.register(); // output the actual (deserialized) stream. WindowedValue.FullWindowedValueCoder coder = @@ -148,18 +155,25 @@ public class SparkUnboundedSource { * Updates {@link MetricsAccumulator} with metrics reported in the read. */ private static class ReadReportDStream extends DStream { + +private static final String READ_DURATION_MILLIS = "readDurationMillis"; +private static final String NAMESPACE = "spark-runner.io"; + private final DStream parent; private final int inputDStreamId; private final String sourceName; +private final String stepName; ReadReportDStream( DStream parent, int inputDStreamId, -String sourceName) { +String sourceName, +String stepName) { super(parent.ssc(), JavaSparkContext$.MODULE$.fakeClassTag()); this.parent = parent; this.inputDStreamId = inputDStreamId; this.sourceName = sourceName; + this.stepName = stepName; } @Override @@ -182,6 +196,7 @@ public class SparkUnboundedSource { SparkWatermarks sparkWatermark = null; Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE; Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE; + long maxReadDuration = 0; if (parentRDDOpt.isDefined()) { JavaRDD parentRDD = parentRDDOpt.get().toJavaRDD(); for (Metadata metadata: parentRDD.collect()) { @@ -196,6 +211,16 @@ public class SparkUnboundedSource { globalHighWatermarkForBatch.isBefore(partitionHighWatermark) ? partitionHighWatermark : globalHighWatermarkForBatch; // Update metrics reported in the read + final Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS); + final MetricsContainer container = metadata.getMetricsContainer().getContainer(stepName); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container)) { +final long readDurationMillis = metadata.getReadDurationMillis();
[1/2] beam git commit: Added a specific signature so that Kryo doesn't have to look for it using reflective exploration.
Repository: beam Updated Branches: refs/heads/master 2a40534e8 -> 22d368b40 Added a specific signature so that Kryo doesn't have to look for it using reflective exploration. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a02c600 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a02c600 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a02c600 Branch: refs/heads/master Commit: 1a02c60032f5733a6b2bae8a5853864fea86c035 Parents: 2a40534 Author: Stas LevinAuthored: Wed Mar 29 15:33:17 2017 +0300 Committer: Stas Levin Committed: Thu Mar 30 15:00:43 2017 +0300 -- .../runners/spark/coders/StatelessJavaSerializer.java| 11 +++ 1 file changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/1a02c600/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java index b29cf0c..01b3b93 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java @@ -54,6 +54,17 @@ import java.io.ObjectStreamClass; * */ class StatelessJavaSerializer extends Serializer { + + // Since Kryo uses reflection to sequentially look for constructor signatures, starting + // with this particular signature spares exploring further ones, which involves + // NoSuchMethodException(s) being thrown as part of the exploration process and may slow + // things down, see Kryo#newSerializer(), see https://goo.gl/Jn425G + public StatelessJavaSerializer(final Kryo ignore1, final Class ignore2) {} + + public StatelessJavaSerializer() { +this(null, null); + } + @SuppressWarnings("unchecked") public void write(Kryo kryo, Output output, Object object) { try {
[2/2] beam git commit: This closes #2356
This closes #2356 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a40534e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a40534e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a40534e Branch: refs/heads/master Commit: 2a40534e80fb84d969ac16bd0d62618109ee04b4 Parents: 769398e 3876f83 Author: Stas LevinAuthored: Thu Mar 30 14:53:57 2017 +0300 Committer: Stas Levin Committed: Thu Mar 30 14:53:57 2017 +0300 -- .../runners/spark/io/SparkUnboundedSource.java | 37 .../spark/stateful/StateSpecFunctions.java | 17 + 2 files changed, 31 insertions(+), 23 deletions(-) --
[1/2] beam git commit: Extracted captures to static classes to prevent them from capturing the scope.
Repository: beam Updated Branches: refs/heads/master 769398e40 -> 2a40534e8 Extracted captures to static classes to prevent them from capturing the scope. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3876f83a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3876f83a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3876f83a Branch: refs/heads/master Commit: 3876f83a82845e3c0a41152cf7a7c58378d994e7 Parents: 769398e Author: Stas LevinAuthored: Wed Mar 29 15:29:20 2017 +0300 Committer: Stas Levin Committed: Thu Mar 30 14:53:38 2017 +0300 -- .../runners/spark/io/SparkUnboundedSource.java | 37 .../spark/stateful/StateSpecFunctions.java | 17 + 2 files changed, 31 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index a538907..162bca4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -102,13 +102,7 @@ public class SparkUnboundedSource { // report the number of input elements for this InputDStream to the InputInfoTracker. int id = inputDStream.inputDStream().id(); -JavaDStream metadataDStream = mapWithStateDStream.map( -new Function , Metadata>, Metadata>() { - @Override - public Metadata call(Tuple2 , Metadata> t2) throws Exception { -return t2._2(); - } -}); +JavaDStream metadataDStream = mapWithStateDStream.map(new Tuple2MetadataFunction()); // register ReadReportDStream to report information related to this read. new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register(); @@ -118,13 +112,10 @@ public class SparkUnboundedSource { WindowedValue.FullWindowedValueCoder.of( source.getDefaultOutputCoder(), GlobalWindow.Coder.INSTANCE); -JavaDStream readUnboundedStream = mapWithStateDStream.flatMap( -new FlatMapFunction , Metadata>, byte[]>() { - @Override - public Iterable call(Tuple2 , Metadata> t2) throws Exception { -return t2._1(); - } -}).map(CoderHelpers.fromByteFunction(coder)); +JavaDStream readUnboundedStream = +mapWithStateDStream +.flatMap(new Tuple2byteFlatMapFunction()) +.map(CoderHelpers.fromByteFunction(coder)); return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id)); } @@ -274,4 +265,22 @@ public class SparkUnboundedSource { return metricsContainer; } } + + private static class Tuple2MetadataFunction + implements Function , Metadata>, Metadata> { + +@Override +public Metadata call(Tuple2 , Metadata> t2) throws Exception { + return t2._2(); +} + } + + private static class Tuple2byteFlatMapFunction + implements FlatMapFunction , Metadata>, byte[]> { + +@Override +public Iterable call(Tuple2 , Metadata> t2) throws Exception { + return t2._1(); +} + } } http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index ec4fce3..803fe45 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -20,11 +20,11 @@ package org.apache.beam.runners.spark.stateful; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import
[1/2] beam git commit: [BEAM-1629] Init metrics/aggregators accumulators before traversing pipeline
Repository: beam Updated Branches: refs/heads/master d16715309 -> b6ca062fc [BEAM-1629] Init metrics/aggregators accumulators before traversing pipeline Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/874c8d0d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/874c8d0d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/874c8d0d Branch: refs/heads/master Commit: 874c8d0da65568b01cd5f184e303d39c7810a8bf Parents: d167153 Author: Aviem ZurAuthored: Mon Mar 6 20:48:48 2017 +0200 Committer: Stas Levin Committed: Sun Mar 12 10:02:23 2017 +0200 -- .../spark/SparkNativePipelineVisitor.java | 4 -- .../beam/runners/spark/SparkPipelineResult.java | 8 +-- .../apache/beam/runners/spark/SparkRunner.java | 65 ++-- .../beam/runners/spark/SparkRunnerDebugger.java | 30 ++--- .../beam/runners/spark/TestSparkRunner.java | 4 +- .../aggregators/AggregatorsAccumulator.java | 44 + .../spark/aggregators/SparkAggregators.java | 40 ++-- .../spark/metrics/AggregatorMetricSource.java | 11 ++-- .../spark/metrics/MetricsAccumulator.java | 38 .../spark/metrics/SparkBeamMetricSource.java| 11 ++-- .../spark/metrics/SparkMetricsContainer.java| 17 ++--- .../spark/translation/TransformTranslator.java | 13 ++-- .../SparkRunnerStreamingContextFactory.java | 3 + .../streaming/StreamingTransformTranslator.java | 10 +-- .../metrics/sink/NamedAggregatorsTest.java | 15 + .../ResumeFromCheckpointStreamingTest.java | 4 +- 16 files changed, 156 insertions(+), 161 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java index 056da97..c2784a2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.spark; import com.google.common.base.Joiner; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -27,11 +26,9 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; -import org.apache.beam.runners.spark.translation.streaming.Checkpoint; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.MapElements; @@ -55,7 +52,6 @@ public class SparkNativePipelineVisitor extends SparkRunner.Evaluator { SparkNativePipelineVisitor(SparkPipelineTranslator translator, EvaluationContext ctxt) { super(translator, ctxt); this.transforms = new ArrayList<>(); -MetricsAccumulator.init(ctxt.getSparkContext(), Optional.absent()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index ddc1964..ed1e0c8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeoutException; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.metrics.SparkMetricResults; import org.apache.beam.runners.spark.translation.SparkContextFactory; -import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -84,13 +83,12 @@ public abstract class SparkPipelineResult implements PipelineResult { throws TimeoutException, ExecutionException, InterruptedException;
[2/2] beam git commit: This closes #2171
This closes #2171 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b6ca062f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b6ca062f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b6ca062f Branch: refs/heads/master Commit: b6ca062fcfa31884baf08b804d04c12dee10b62e Parents: d167153 874c8d0 Author: Stas LevinAuthored: Sun Mar 12 10:02:30 2017 +0200 Committer: Stas Levin Committed: Sun Mar 12 10:02:30 2017 +0200 -- .../spark/SparkNativePipelineVisitor.java | 4 -- .../beam/runners/spark/SparkPipelineResult.java | 8 +-- .../apache/beam/runners/spark/SparkRunner.java | 65 ++-- .../beam/runners/spark/SparkRunnerDebugger.java | 30 ++--- .../beam/runners/spark/TestSparkRunner.java | 4 +- .../aggregators/AggregatorsAccumulator.java | 44 + .../spark/aggregators/SparkAggregators.java | 40 ++-- .../spark/metrics/AggregatorMetricSource.java | 11 ++-- .../spark/metrics/MetricsAccumulator.java | 38 .../spark/metrics/SparkBeamMetricSource.java| 11 ++-- .../spark/metrics/SparkMetricsContainer.java| 17 ++--- .../spark/translation/TransformTranslator.java | 13 ++-- .../SparkRunnerStreamingContextFactory.java | 3 + .../streaming/StreamingTransformTranslator.java | 10 +-- .../metrics/sink/NamedAggregatorsTest.java | 15 + .../ResumeFromCheckpointStreamingTest.java | 4 +- 16 files changed, 156 insertions(+), 161 deletions(-) --
[2/2] beam git commit: This closes #2161
This closes #2161 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59451bca Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59451bca Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59451bca Branch: refs/heads/master Commit: 59451bca60ef45b1daeb14b8194bac0f8bdcc98b Parents: 11a381b 4febd95 Author: Stas LevinAuthored: Sun Mar 5 17:51:59 2017 +0200 Committer: Stas Levin Committed: Sun Mar 5 17:51:59 2017 +0200 -- .../apache/beam/runners/spark/translation/BoundedDataset.java | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) --
[1/2] beam git commit: [BEAM-1625] BoundedDataset action() does not materialize RDD
Repository: beam Updated Branches: refs/heads/master 11a381b23 -> 59451bca6 [BEAM-1625] BoundedDataset action() does not materialize RDD Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4febd954 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4febd954 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4febd954 Branch: refs/heads/master Commit: 4febd954af00458032efbee45b7f9724fe0ea9ed Parents: 11a381b Author: Aviem ZurAuthored: Sun Mar 5 16:17:35 2017 +0200 Committer: Aviem Zur Committed: Sun Mar 5 16:17:35 2017 +0200 -- .../apache/beam/runners/spark/translation/BoundedDataset.java | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/4febd954/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 5e19846..7db04a8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.translation; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -107,9 +106,8 @@ public class BoundedDataset implements Dataset { @Override public void action() { // Empty function to force computation of RDD. -rdd.foreachPartition(new VoidFunction >() { - @Override - public void call(Iterator windowedValueIterator) throws Exception { +rdd.foreach(new VoidFunction () { + @Override public void call(WindowedValue tWindowedValue) throws Exception { // Empty implementation. } });
[2/2] beam git commit: This closes #2083
This closes #2083 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3082178b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3082178b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3082178b Branch: refs/heads/master Commit: 3082178b3a35143573caa2b3d9afbf2babb5b2b5 Parents: 4bba380 b74f8fa Author: Stas LevinAuthored: Mon Feb 27 14:16:55 2017 +0200 Committer: Stas Levin Committed: Mon Feb 27 14:16:55 2017 +0200 -- .../runners/spark/metrics/SparkBeamMetric.java | 13 - .../spark/metrics/SparkBeamMetricTest.java | 60 2 files changed, 70 insertions(+), 3 deletions(-) --
[1/2] beam git commit: Remove periods from step name in SparkBeamMetric to avoid inconsistent hierarchies in Graphite metric sink
Repository: beam Updated Branches: refs/heads/master 4bba380cb -> 3082178b3 Remove periods from step name in SparkBeamMetric to avoid inconsistent hierarchies in Graphite metric sink Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b74f8fa3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b74f8fa3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b74f8fa3 Branch: refs/heads/master Commit: b74f8fa30aea130f990c152a1c91a509c3b6d6c9 Parents: 4bba380 Author: Aviem ZurAuthored: Thu Feb 23 07:09:55 2017 +0200 Committer: Stas Levin Committed: Mon Feb 27 14:15:43 2017 +0200 -- .../runners/spark/metrics/SparkBeamMetric.java | 13 - .../spark/metrics/SparkBeamMetricTest.java | 60 2 files changed, 70 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/b74f8fa3/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java index 8e31b22..8328a1a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.metrics; import com.codahale.metrics.Metric; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.metrics.DistributionResult; @@ -33,6 +34,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter; */ class SparkBeamMetric implements Metric { private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9\\._-]"; + private static final String ILLEGAL_CHARACTERS_AND_PERIOD = "[^A-Za-z0-9_-]"; private final SparkMetricResults metricResults = new SparkMetricResults(); @@ -54,9 +56,14 @@ class SparkBeamMetric implements Metric { return metrics; } - private String renderName(MetricResult metricResult) { + @VisibleForTesting + String renderName(MetricResult metricResult) { +String renderedStepName = metricResult.step().replaceAll(ILLEGAL_CHARACTERS_AND_PERIOD, "_"); +if (renderedStepName.endsWith("_")) { + renderedStepName = renderedStepName.substring(0, renderedStepName.length() - 1); +} MetricName metricName = metricResult.name(); -String rendered = metricResult.step() + "." + metricName.namespace() + "." + metricName.name(); -return rendered.replaceAll(ILLEGAL_CHARACTERS, "_"); +return (renderedStepName + "." + metricName.namespace() + "." + metricName.name()) +.replaceAll(ILLEGAL_CHARACTERS, "_"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b74f8fa3/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java new file mode 100644 index 000..9426b2c --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricResult; +import org.junit.Test; + + +/** + * Test SparkBeamMetric. + */ +public class SparkBeamMetricTest { + @Test + public void testRenderName() throws Exception { +MetricResult metricResult = new MetricResult() { + @Override + public
[2/2] beam git commit: This closes #2089
This closes #2089 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4bba380c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4bba380c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4bba380c Branch: refs/heads/master Commit: 4bba380cb011e38bd78c1ec899a6672a6b02fd05 Parents: 38a01e9 acc1013 Author: Stas LevinAuthored: Sun Feb 26 17:28:26 2017 +0200 Committer: Stas Levin Committed: Sun Feb 26 17:28:26 2017 +0200 -- .../apache/beam/sdk/testing/TestPipeline.java | 42 +++- 1 file changed, 32 insertions(+), 10 deletions(-) --
[2/3] beam-site git commit: Regenerate website
Regenerate website Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/1a607ad8 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/1a607ad8 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/1a607ad8 Branch: refs/heads/asf-site Commit: 1a607ad8ea7667addbfe27e097a1d4ca912bf15a Parents: 3c0c532 Author: Stas LevinAuthored: Fri Feb 24 17:36:38 2017 +0200 Committer: Stas Levin Committed: Fri Feb 24 17:36:38 2017 +0200 -- content/contribute/testing/index.html | 157 + 1 file changed, 157 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam-site/blob/1a607ad8/content/contribute/testing/index.html -- diff --git a/content/contribute/testing/index.html b/content/contribute/testing/index.html index 92a0e1f..9b41686 100644 --- a/content/contribute/testing/index.html +++ b/content/contribute/testing/index.html @@ -173,6 +173,8 @@ Testing Systems E2E Testing Framework RunnableOnService Tests + Effective use of the TestPipeline JUnit rule + API Surface testing @@ -542,6 +544,161 @@ which enables test authors to write simple functionality verification. They are meant to use some of the built-in utilities of the SDK, namely PAssert, to verify that the simple pipelines they run end in the correct state. +Effective use of the TestPipeline JUnit rule + +TestPipeline is JUnit rule designed to facilitate testing pipelines. +In combination with PAssert, the two can be used for testing and +writing assertions over pipelines. However, in order for these assertions +to be effective, the constructed pipeline must be run by a pipeline +runner. If the pipeline is not run (i.e., executed) then the +constructed PAssert statements will not be triggered, and will thus +be ineffective. + +To prevent such cases, TestPipeline has some protection mechanisms in place. + +Abandoned node detection (performed automatically) + +Abandoned nodes are PTransforms, PAsserts included, that were not +executed by the pipeline runner. Abandoned nodes are most likely to occur +due to the one of the following scenarios: + + Lack of a pipeline.run() statement at the end of a test. + Addition of PTransforms after the pipeline has already run. + + +Abandoned node detection is automatically enabled when a real pipeline +runner (i.e. not a CrashingRunner) and/or a +@NeedsRunner / @RunnableOnService annotation are detected. + +Consider the following test: + +// Note the @Rule annotation here +@Rule +public final transient TestPipeline pipeline = TestPipeline.create(); + +@Test +@Category(NeedsRunner.class) +public void myPipelineTest() throws Exception { + +final PCollectionString pCollection = + pipeline +.apply("Create", Create.of(WORDS).withCoder(StringUtf8Coder.of())) +.apply( +"Map1", +MapElements.via( +new SimpleFunctionString, String() { + + @Override + public String apply(final String input) { +return WHATEVER; + } +})); + +PAssert.that(pCollection).containsInAnyOrder(WHATEVER); + +/* ERROR: pipeline.run() is missing, PAsserts are ineffective */ +} + + + +# Unsupported in Beam's Python SDK. + + + +The PAssert at the end of this test method will not be executed, since +pipeline is never run, making this test ineffective. If this test method +is run using an actual pipeline runner, an exception will be thrown +indicating that there was no run() invocation in the test. + +Exceptions that are thrown prior to executing a pipeline, will fail +the test unless handled by an ExpectedException rule. + +Consider the following test: + +// Note the @Rule annotation here +@Rule +public final transient TestPipeline pipeline = TestPipeline.create(); + +@Test +public void testReadingFailsTableDoesNotExist() throws Exception { + final String table = "TEST-TABLE"; + + BigtableIO.Read read = + BigtableIO.read() + .withBigtableOptions(BIGTABLE_OPTIONS) + .withTableId(table) + .withBigtableService(service); + + // Exception will be thrown by read.validate() when read is applied. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format("Table %s does not exist", table)); + + p.apply(read); +} + + + +# Unsupported in Beam's Python SDK. + + + +The application of the read transform throws an exception, which is then +handled by the thrown ExpectedException rule. +In light of this exception, the fact this test has abandoned nodes +(the read transform) does not play a role since the test fails before +the pipeline would
[3/3] beam-site git commit: This closes #162
This closes #162 Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/408f290e Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/408f290e Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/408f290e Branch: refs/heads/asf-site Commit: 408f290ebe6270e8f80ef4e73bdb47931a4f7a74 Parents: 3fd0ed6 1a607ad Author: Stas LevinAuthored: Fri Feb 24 17:38:28 2017 +0200 Committer: Stas Levin Committed: Fri Feb 24 17:38:28 2017 +0200 -- content/contribute/testing/index.html | 157 + src/contribute/testing.md | 155 2 files changed, 312 insertions(+) --
[2/2] beam git commit: This closes #1706
This closes #1706 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f03f6ac1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f03f6ac1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f03f6ac1 Branch: refs/heads/master Commit: f03f6ac192dcc5f1ba92d9639b153a3afba3340d Parents: 5fe7844 50daea2 Author: Stas LevinAuthored: Thu Feb 16 11:18:24 2017 +0200 Committer: Stas Levin Committed: Thu Feb 16 11:18:24 2017 +0200 -- .../apache/beam/sdk/testing/Annotations.java| 72 +++ .../apache/beam/sdk/testing/TestPipeline.java | 66 ++- .../apache/beam/sdk/metrics/MetricsTest.java| 6 +- .../beam/sdk/testing/TestPipelineTest.java | 504 +++ .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 9 +- 5 files changed, 417 insertions(+), 240 deletions(-) --
[1/2] beam git commit: [BEAM-1205] Auto set "enableAbandonedNodeEnforcement" in TestPipeline
Repository: beam Updated Branches: refs/heads/master 5fe78440b -> f03f6ac19 [BEAM-1205] Auto set "enableAbandonedNodeEnforcement" in TestPipeline Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50daea28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50daea28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50daea28 Branch: refs/heads/master Commit: 50daea288b9c5df2b481e5e6bea153796c03830a Parents: 5fe7844 Author: Stas LevinAuthored: Thu Dec 22 19:13:01 2016 +0200 Committer: Stas Levin Committed: Thu Feb 16 11:18:09 2017 +0200 -- .../apache/beam/sdk/testing/Annotations.java| 72 +++ .../apache/beam/sdk/testing/TestPipeline.java | 66 ++- .../apache/beam/sdk/metrics/MetricsTest.java| 6 +- .../beam/sdk/testing/TestPipelineTest.java | 504 +++ .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 9 +- 5 files changed, 417 insertions(+), 240 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java new file mode 100644 index 000..e560226 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.beam.sdk.testing; + +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import java.lang.annotation.Annotation; +import java.util.Arrays; +import javax.annotation.Nonnull; +import org.junit.experimental.categories.Category; + +/** + * A utility class for querying annotations. + */ +class Annotations { + + /** + * Annotation predicates. + */ + static class Predicates { + +static Predicate isAnnotationOfType(final Class clazz) { + return new Predicate() { + +@Override +public boolean apply(@Nonnull final Annotation annotation) { + return annotation.annotationType() != null + && annotation.annotationType().equals(clazz); +} + }; +} + +static Predicate isCategoryOf(final Class value, final boolean allowDerived) { + return new Predicate() { + +@Override +public boolean apply(@Nonnull final Annotation category) { + return + FluentIterable + .from(Arrays.asList(((Category) category).value())) + .anyMatch(new Predicate () { + +@Override +public boolean apply(final Class aClass) { + return + allowDerived + ? value.isAssignableFrom(aClass) + : value.equals(aClass); +} + }); +} + }; +} + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 37c809a..02eefa9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.testing; +import static com.google.common.base.Preconditions.checkState; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.JsonNode; @@ -93,15 +95,18 @@ public class TestPipeline extends Pipeline implements TestRule { private static class
[2/2] beam git commit: This closes #1991
This closes #1991 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4db4fb2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4db4fb2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4db4fb2 Branch: refs/heads/master Commit: d4db4fb2c20e46beac1a1fc0ef47ec30d9a2bd34 Parents: 4beed50 8014a6e Author: Stas LevinAuthored: Mon Feb 13 15:18:07 2017 +0200 Committer: Stas Levin Committed: Mon Feb 13 15:18:07 2017 +0200 -- .../spark/src/test/resources/log4j.properties | 30 1 file changed, 30 insertions(+) --
[2/2] beam git commit: This closes #1986
This closes #1986 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4beed50f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4beed50f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4beed50f Branch: refs/heads/master Commit: 4beed50f27a745ec38c14ea4a4ed88c0a346f485 Parents: 93ae666 2f63b1f Author: Stas LevinAuthored: Mon Feb 13 12:45:54 2017 +0200 Committer: Stas Levin Committed: Mon Feb 13 12:45:54 2017 +0200 -- .../streaming/ResumeFromCheckpointStreamingTest.java | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) --
[1/2] beam git commit: Fixed javadoc comment in ResumeFromCheckpointStreamingTest.
Repository: beam Updated Branches: refs/heads/master 93ae666be -> 4beed50f2 Fixed javadoc comment in ResumeFromCheckpointStreamingTest. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f63b1f6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f63b1f6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f63b1f6 Branch: refs/heads/master Commit: 2f63b1f63fefaff996c4ae267af8a01a5718e0c0 Parents: 93ae666 Author: Aviem ZurAuthored: Sun Feb 12 19:56:48 2017 +0200 Committer: Stas Levin Committed: Mon Feb 13 12:40:23 2017 +0200 -- .../streaming/ResumeFromCheckpointStreamingTest.java | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/2f63b1f6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 7094c86..5a27b29 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -64,12 +64,7 @@ import org.junit.rules.TemporaryFolder; /** - * Tests DStream recovery from checkpoint - recreate the job and continue (from checkpoint). - * - * Tests Aggregators, which rely on Accumulators - Aggregators should be available, though - * state is not preserved (Spark issue), so they start from initial value. - * //TODO: after the runner supports recovering the state of Aggregators, update this test's - * expected values for the recovered (second) run. + * Test pipelines which are resumed from checkpoint. */ public class ResumeFromCheckpointStreamingTest { private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = @@ -118,6 +113,10 @@ public class ResumeFromCheckpointStreamingTest { } } + /** + * Tests DStream recovery from checkpoint - recreate the job and continue (from checkpoint). + * Also tests Aggregator values, which should be restored upon recovery from checkpoint. + */ @Test public void testRun() throws Exception { Duration batchIntervalDuration = Duration.standardSeconds(5);
[1/2] beam git commit: [BEAM-882, BEAM-883, BEAM-878] Simplified API surface verifications.
Repository: beam Updated Branches: refs/heads/master e21f9ae86 -> 6b31c14fa [BEAM-882,BEAM-883,BEAM-878] Simplified API surface verifications. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cde550fe Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cde550fe Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cde550fe Branch: refs/heads/master Commit: cde550fe9b90be5fc9548735d5854359fca6a9cf Parents: e21f9ae Author: Stas LevinAuthored: Mon Jan 16 16:20:25 2017 +0200 Committer: Stas Levin Committed: Thu Feb 9 17:40:36 2017 +0200 -- .../org/apache/beam/sdk/util/ApiSurface.java| 446 +-- .../org/apache/beam/SdkCoreApiSurfaceTest.java | 62 +++ .../apache/beam/sdk/util/ApiSurfaceTest.java| 152 ++- .../apache/beam/sdk/io/gcp/ApiSurfaceTest.java | 134 -- .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 79 5 files changed, 495 insertions(+), 378 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/cde550fe/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java index 2040161..9530e88 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java @@ -17,12 +17,21 @@ */ package org.apache.beam.sdk.util; +import static org.hamcrest.Matchers.anyOf; + +import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; import com.google.common.base.Supplier; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; +import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.reflect.ClassPath; import com.google.common.reflect.ClassPath.ClassInfo; @@ -45,15 +54,20 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.StringDescription; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Represents the API surface of a package prefix. Used for accessing public classes, - * methods, and the types they reference, to control what dependencies are re-exported. + * Represents the API surface of a package prefix. Used for accessing public classes, methods, and + * the types they reference, to control what dependencies are re-exported. * - * For the purposes of calculating the public API surface, exposure includes any public - * or protected occurrence of: + * For the purposes of calculating the public API surface, exposure includes any public or + * protected occurrence of: * * * superclasses @@ -66,45 +80,277 @@ import org.slf4j.LoggerFactory; * wildcard bounds * * - * Exposure is a transitive property. The resulting map excludes primitives - * and array classes themselves. + * Exposure is a transitive property. The resulting map excludes primitives and array classes + * themselves. * - * It is prudent (though not required) to prune prefixes like "java" via the builder - * method {@link #pruningPrefix} to halt the traversal so it does not uselessly catalog references - * that are not interesting. + * It is prudent (though not required) to prune prefixes like "java" via the builder method + * {@link #pruningPrefix} to halt the traversal so it does not uselessly catalog references that are + * not interesting. */ @SuppressWarnings("rawtypes") public class ApiSurface { private static final Logger LOG = LoggerFactory.getLogger(ApiSurface.class); + /** A factory method to create a {@link Class} matcher for classes residing in a given package. */ + public static Matcher classesInPackage(final String packageName) { +return new Matchers.ClassInPackage(packageName); + } + /** - * Returns an empty {@link ApiSurface}. + * A factory method to create an {@link ApiSurface} matcher, producing a positive match if the + * queried api surface contains ONLY classes described by the provided matchers. */ - public static ApiSurface empty() { -LOG.debug("Returning an empty
[2/2] beam git commit: This closes #1921
This closes #1921 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c442ef81 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c442ef81 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c442ef81 Branch: refs/heads/master Commit: c442ef81aa5bfbe84c0e3344ed8dc1d15d6e9a36 Parents: e5afbb2 882c654 Author: Stas LevinAuthored: Sun Feb 5 15:51:27 2017 +0200 Committer: Stas Levin Committed: Sun Feb 5 15:51:27 2017 +0200 -- .../runners/spark/aggregators/NamedAggregators.java| 6 -- .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 + 2 files changed, 17 insertions(+), 2 deletions(-) --
[1/2] beam git commit: [BEAM-1304] Checking for nullity before trying to obtain an aggregator's value.
Repository: beam Updated Branches: refs/heads/master e5afbb27f -> c442ef81a [BEAM-1304] Checking for nullity before trying to obtain an aggregator's value. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/882c654b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/882c654b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/882c654b Branch: refs/heads/master Commit: 882c654b1a8aefd2e4281d786448734731db7816 Parents: e5afbb2 Author: Stas LevinAuthored: Sun Feb 5 12:17:35 2017 +0200 Committer: Stas Levin Committed: Sun Feb 5 15:51:18 2017 +0200 -- .../runners/spark/aggregators/NamedAggregators.java| 6 -- .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 + 2 files changed, 17 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index b5aec32..c876c07 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -68,10 +68,12 @@ public class NamedAggregators implements Serializable { * @param name Name of aggregator to retrieve. * @param typeClass Type class to cast the value to. * @paramType to be returned. - * @return the value of the aggregator associated with the specified name + * @return the value of the aggregator associated with the specified name, + * or null if the specified aggregator could not be found. */ public T getValue(String name, Class typeClass) { -return typeClass.cast(mNamedAggregators.get(name).render()); +final State state = mNamedAggregators.get(name); +return state != null ? typeClass.cast(state.render()) : null; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index 3b5dd21..8646510 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -28,10 +28,13 @@ import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule; +import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; @@ -95,4 +98,14 @@ public class NamedAggregatorsTest { assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d)); } + + @Test + public void testNonExistingAggregatorName() throws Exception { +final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); +final Long valueOf = +SparkAggregators.valueOf( +"myMissingAggregator", Long.class, SparkContextFactory.getSparkContext(options)); + +assertThat(valueOf, is(nullValue())); + } }