[1/3] incubator-beam git commit: Add Parameters to finishSpecifying
Repository: incubator-beam Updated Branches: refs/heads/master 57d9bbd79 -> 7984fe3fc Add Parameters to finishSpecifying Remove the need to use getProducingTransformInternal in TypedPValue. Ensure that all nodes are finished specifying before a call to TransformHierarchy#visit. This ensures that all nodes are fully specified without requiring the Pipeline or Runner to do so explicitly. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/038950df Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/038950df Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/038950df Branch: refs/heads/master Commit: 038950df02fa553cbb91f57978efe125a9ebc80f Parents: b053be4 Author: Thomas Groh Authored: Thu Dec 8 14:33:36 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 21 15:26:17 2016 -0800 -- .../translation/ParDoBoundTranslatorTest.java | 32 .../beam/runners/direct/DirectGraphVisitor.java | 21 -- .../beam/runners/direct/DirectRunner.java | 1 - .../runners/direct/DirectGraphVisitorTest.java | 32 +--- .../direct/FlattenEvaluatorFactoryTest.java | 2 + .../direct/KeyedPValueTrackingVisitorTest.java | 17 - .../beam/runners/spark/ForceStreamingTest.java | 2 - .../main/java/org/apache/beam/sdk/Pipeline.java | 3 + .../beam/sdk/runners/TransformHierarchy.java| 45 ++- .../transforms/join/KeyedPCollectionTuple.java | 32 .../java/org/apache/beam/sdk/values/PBegin.java | 5 -- .../apache/beam/sdk/values/PCollectionList.java | 13 +--- .../beam/sdk/values/PCollectionTuple.java | 13 +--- .../java/org/apache/beam/sdk/values/PInput.java | 9 --- .../org/apache/beam/sdk/values/POutput.java | 20 ++--- .../beam/sdk/values/POutputValueBase.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 15 .../org/apache/beam/sdk/values/PValueBase.java | 3 +- .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++- .../sdk/runners/TransformHierarchyTest.java | 34 + .../apache/beam/sdk/transforms/ParDoTest.java | 7 +- .../apache/beam/sdk/values/TypedPValueTest.java | 7 +- 22 files changed, 185 insertions(+), 210 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java -- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java index fa94b2a..f88a94d 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; @@ -286,21 +287,22 @@ public class ParDoBoundTranslatorTest { Arrays.asList(sideInput1, sideInput2), Arrays.>asList(; - outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); - - HashSet expected = Sets.newHashSet("processing: 3: [11, 222]", - "processing: -42: [11, 222]", "processing: 666: [11, 222]"); - long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(expected)) { - break; - } - LOG.info("Waiting for expected results."); - Thread.sleep(SLEEP_MILLIS); - } - result.cancel(); - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); +outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); +outputs.get(sideOutputTag).setCoder(VoidCoder.of()); +ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); + +HashSet expected = Sets.newHashSet("processing: 3: [11, 222]", +"processing: -42: [11, 222]", "processing: 666: [11, 222]"); +long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; +while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { +break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(SLEEP_MILLIS); +
[2/3] incubator-beam git commit: Use CountingSource in ForceStreamingTest
Use CountingSource in ForceStreamingTest Removes the requirement to have a FakeUnboundedSource, plus the read is fully specified. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b053be46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b053be46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b053be46 Branch: refs/heads/master Commit: b053be460c2e6ff486faed1b1a0996af63f93db2 Parents: 57d9bbd Author: Thomas Groh Authored: Tue Dec 20 14:23:21 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 21 15:26:17 2016 -0800 -- .../beam/runners/spark/ForceStreamingTest.java | 39 +--- 1 file changed, 2 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b053be46/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java index eb17eea..1b2ff08 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java @@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.io.IOException; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; +import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -59,7 +58,7 @@ public class ForceStreamingTest { // apply the BoundedReadFromUnboundedSource. @SuppressWarnings("unchecked") BoundedReadFromUnboundedSource boundedRead = -Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1); +Read.from(CountingSource.unbounded()).withMaxNumRecords(-1); //noinspection unchecked pipeline.apply(boundedRead); @@ -86,38 +85,4 @@ public class ForceStreamingTest { } } - - /** - * A fake {@link UnboundedSource} to satisfy the compiler. - */ - private static class FakeUnboundedSource extends UnboundedSource { - -@Override -public List generateInitialSplits( -int desiredNumSplits, -PipelineOptions options) throws Exception { - return null; -} - -@Override -public UnboundedReader createReader( -PipelineOptions options, -CheckpointMark checkpointMark) throws IOException { - return null; -} - -@Override -public Coder getCheckpointMarkCoder() { - return null; -} - -@Override -public void validate() { } - -@Override -public Coder getDefaultOutputCoder() { - return null; -} - } - }
[3/3] incubator-beam git commit: This closes #1582
This closes #1582 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7984fe3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7984fe3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7984fe3f Branch: refs/heads/master Commit: 7984fe3fc20160d2286433434190f35658aef158 Parents: 57d9bbd 038950d Author: Thomas Groh Authored: Wed Dec 21 15:26:18 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 21 15:26:18 2016 -0800 -- .../translation/ParDoBoundTranslatorTest.java | 32 .../beam/runners/direct/DirectGraphVisitor.java | 21 -- .../beam/runners/direct/DirectRunner.java | 1 - .../runners/direct/DirectGraphVisitorTest.java | 32 +--- .../direct/FlattenEvaluatorFactoryTest.java | 2 + .../direct/KeyedPValueTrackingVisitorTest.java | 17 - .../beam/runners/spark/ForceStreamingTest.java | 41 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 3 + .../beam/sdk/runners/TransformHierarchy.java| 45 ++- .../transforms/join/KeyedPCollectionTuple.java | 32 .../java/org/apache/beam/sdk/values/PBegin.java | 5 -- .../apache/beam/sdk/values/PCollectionList.java | 13 +--- .../beam/sdk/values/PCollectionTuple.java | 13 +--- .../java/org/apache/beam/sdk/values/PInput.java | 9 --- .../org/apache/beam/sdk/values/POutput.java | 20 ++--- .../beam/sdk/values/POutputValueBase.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 15 .../org/apache/beam/sdk/values/PValueBase.java | 3 +- .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++- .../sdk/runners/TransformHierarchyTest.java | 34 + .../apache/beam/sdk/transforms/ParDoTest.java | 7 +- .../apache/beam/sdk/values/TypedPValueTest.java | 7 +- 22 files changed, 187 insertions(+), 247 deletions(-) --
[GitHub] incubator-beam pull request #1677: Add a Test for Flatten with Heterogeneous...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1677 Add a Test for Flatten with Heterogeneous Coders Add a category, and suppress in the Flink and Apex runners Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam flatten_multiple_coders Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1677.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1677 commit 13a8da998c9c03a8ccf914e1dcc16c2aed038930 Author: Thomas Groh Date: 2016-12-21T21:53:48Z Add a Test for Flatten with Heterogeneous Coders Add a category, and suppress in the Flink and Apex runners --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: This closes #1569
Repository: incubator-beam Updated Branches: refs/heads/master 6a05d7f17 -> aadcf3a12 This closes #1569 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aadcf3a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aadcf3a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aadcf3a1 Branch: refs/heads/master Commit: aadcf3a1203b257961a1a474acf74e6bbca1e2ad Parents: 6a05d7f 34373c2 Author: Thomas Groh Authored: Tue Dec 20 15:18:55 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 20 15:18:55 2016 -0800 -- .../beam/runners/direct/DirectGraphVisitor.java | 18 +-- .../beam/runners/direct/EvaluationContext.java | 7 +- .../direct/KeyedPValueTrackingVisitor.java | 16 ++- .../beam/runners/direct/WatermarkManager.java | 19 +-- .../apache/beam/runners/spark/SparkRunner.java | 13 ++- .../beam/sdk/runners/TransformHierarchy.java| 49 .../transforms/join/KeyedPCollectionTuple.java | 9 +- .../java/org/apache/beam/sdk/values/PBegin.java | 4 +- .../apache/beam/sdk/values/PCollectionList.java | 65 +++ .../beam/sdk/values/PCollectionTuple.java | 28 - .../java/org/apache/beam/sdk/values/PDone.java | 4 +- .../java/org/apache/beam/sdk/values/PInput.java | 4 +- .../org/apache/beam/sdk/values/POutput.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 10 ++ .../org/apache/beam/sdk/values/PValueBase.java | 11 +- .../apache/beam/sdk/values/TaggedPValue.java| 42 +++ .../sdk/runners/TransformHierarchyTest.java | 23 +++- .../apache/beam/sdk/transforms/ParDoTest.java | 34 ++ .../beam/sdk/values/PCollectionListTest.java| 117 +++ .../beam/sdk/values/PCollectionTupleTest.java | 70 +++ 20 files changed, 449 insertions(+), 98 deletions(-) --
[2/2] incubator-beam git commit: Provide local tags in PInput, POutput expansions
Provide local tags in PInput, POutput expansions Output an ordered colleciton in PInput and POutput expansions. This provides information that is necessary to reconstruct a PInput or POutput from its expansion. Implement PCollectionList.equals, PCollectionTuple.equals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34373c21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34373c21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34373c21 Branch: refs/heads/master Commit: 34373c21ed67696235d88ef40d50e31c77b84c33 Parents: 6a05d7f Author: Thomas Groh Authored: Tue Dec 6 11:03:52 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 20 15:18:55 2016 -0800 -- .../beam/runners/direct/DirectGraphVisitor.java | 18 +-- .../beam/runners/direct/EvaluationContext.java | 7 +- .../direct/KeyedPValueTrackingVisitor.java | 16 ++- .../beam/runners/direct/WatermarkManager.java | 19 +-- .../apache/beam/runners/spark/SparkRunner.java | 13 ++- .../beam/sdk/runners/TransformHierarchy.java| 49 .../transforms/join/KeyedPCollectionTuple.java | 9 +- .../java/org/apache/beam/sdk/values/PBegin.java | 4 +- .../apache/beam/sdk/values/PCollectionList.java | 65 +++ .../beam/sdk/values/PCollectionTuple.java | 28 - .../java/org/apache/beam/sdk/values/PDone.java | 4 +- .../java/org/apache/beam/sdk/values/PInput.java | 4 +- .../org/apache/beam/sdk/values/POutput.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 10 ++ .../org/apache/beam/sdk/values/PValueBase.java | 11 +- .../apache/beam/sdk/values/TaggedPValue.java| 42 +++ .../sdk/runners/TransformHierarchyTest.java | 23 +++- .../apache/beam/sdk/transforms/ParDoTest.java | 34 ++ .../beam/sdk/values/PCollectionListTest.java| 117 +++ .../beam/sdk/values/PCollectionTupleTest.java | 70 +++ 20 files changed, 449 insertions(+), 98 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 0283d03..425bbf1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; /** * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the @@ -79,14 +80,16 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { -toFinalize.removeAll(node.getInputs()); +for (TaggedPValue consumed : node.getInputs()) { + toFinalize.remove(consumed.getValue()); +} AppliedPTransform appliedTransform = getAppliedTransform(node); stepNames.put(appliedTransform, genStepName()); if (node.getInputs().isEmpty()) { rootTransforms.add(appliedTransform); } else { - for (PValue value : node.getInputs()) { -primitiveConsumers.put(value, appliedTransform); + for (TaggedPValue value : node.getInputs()) { +primitiveConsumers.put(value.getValue(), appliedTransform); } } } @@ -96,15 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { toFinalize.add(value); AppliedPTransform appliedTransform = getAppliedTransform(producer); +if (value instanceof PCollectionView) { + views.add((PCollectionView) value); +} if (!producers.containsKey(value)) { producers.put(value, appliedTransform); } - if (value instanceof PCollectionView) { - views.add((PCollectionView) value); - } - if (!producers.containsKey(value)) { - producers.put(value, appliedTransform); - } } private AppliedPTransform getAppliedTransform(TransformHierarchy.Node node) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/s
[1/2] incubator-beam git commit: Add PTransformOverrideFactory to the Core SDK
Repository: incubator-beam Updated Branches: refs/heads/master 217c29bfc -> cc28f0cb4 Add PTransformOverrideFactory to the Core SDK This migrates PTransformOverrideFactory from the DirectRunner to the Core SDK, as part of BEAM-646. Add getOriginalToReplacements to provide a mapping from the original outputs to replaced outputs. This enables all replaced nodes to be rewired to output the original output. Migrate all DirectRunner Override Factories to the new PTransformOverrideFactory. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f227a0a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f227a0a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f227a0a Branch: refs/heads/master Commit: 3f227a0ad18f425767e89f88d8a1c9fdcebdd80c Parents: 217c29b Author: Thomas Groh Authored: Mon Dec 5 16:01:57 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 16 14:21:49 2016 -0800 -- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 7 ++-- .../direct/DirectGroupByKeyOverrideFactory.java | 3 +- .../beam/runners/direct/DirectRunner.java | 7 +++- .../direct/PTransformOverrideFactory.java | 35 - .../direct/ParDoMultiOverrideFactory.java | 9 +++-- .../ParDoSingleViaMultiOverrideFactory.java | 11 +++--- .../direct/TestStreamEvaluatorFactory.java | 5 ++- .../runners/direct/ViewEvaluatorFactory.java| 4 +- .../direct/WriteWithShardingFactory.java| 6 ++- .../direct/WriteWithShardingFactoryTest.java| 4 +- .../beam/sdk/annotations/Experimental.java | 5 ++- .../sdk/runners/PTransformOverrideFactory.java | 41 12 files changed, 80 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index 1fa059c..ab4c114 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -18,7 +18,8 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -30,10 +31,10 @@ import org.apache.beam.sdk.values.PCollection; class DirectGBKIntoKeyedWorkItemsOverrideFactory implements PTransformOverrideFactory< PCollection>, PCollection>, -SplittableParDo.GBKIntoKeyedWorkItems> { +GBKIntoKeyedWorkItems> { @Override public PTransform>, PCollection>> - override(SplittableParDo.GBKIntoKeyedWorkItems transform) { + getReplacementTransform(GBKIntoKeyedWorkItems transform) { return new DirectGroupByKey.DirectGroupByKeyOnly<>(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index 9acf5e9..7cf3256 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; @@ -27,7 +28,7 @@ final class DirectGroupByKeyOverrideFactory implements PTransformOverrideFactory< PCollection>, PCollection>>, GroupByKey> { @Override - public PTransform>, PCollection>>> override( + public PTransform>, PCollection>>> getReplacementTransform( GroupByKey transform) { return new Dire
[2/2] incubator-beam git commit: This closes #1547
This closes #1547 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc28f0cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc28f0cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc28f0cb Branch: refs/heads/master Commit: cc28f0cb4c44169f933475ae29a32599024d3a1f Parents: 217c29b 3f227a0 Author: Thomas Groh Authored: Fri Dec 16 14:21:50 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 16 14:21:50 2016 -0800 -- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 7 ++-- .../direct/DirectGroupByKeyOverrideFactory.java | 3 +- .../beam/runners/direct/DirectRunner.java | 7 +++- .../direct/PTransformOverrideFactory.java | 35 - .../direct/ParDoMultiOverrideFactory.java | 9 +++-- .../ParDoSingleViaMultiOverrideFactory.java | 11 +++--- .../direct/TestStreamEvaluatorFactory.java | 5 ++- .../runners/direct/ViewEvaluatorFactory.java| 4 +- .../direct/WriteWithShardingFactory.java| 6 ++- .../direct/WriteWithShardingFactoryTest.java| 4 +- .../beam/sdk/annotations/Experimental.java | 5 ++- .../sdk/runners/PTransformOverrideFactory.java | 41 12 files changed, 80 insertions(+), 57 deletions(-) --
[2/2] incubator-beam git commit: This closes #1625
This closes #1625 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c09fbd43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c09fbd43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c09fbd43 Branch: refs/heads/release-0.4.0-incubating Commit: c09fbd43b0cdaa57cffc1925c313dcf91390adc7 Parents: 5dcffe0 e864ac2 Author: Thomas Groh Authored: Thu Dec 15 11:12:01 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 15 11:12:01 2016 -0800 -- .../main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Don't incorrectly log error in MetricsEnvironment
Repository: incubator-beam Updated Branches: refs/heads/release-0.4.0-incubating 5dcffe066 -> c09fbd43b Don't incorrectly log error in MetricsEnvironment Using getCurrentContainer() logs an error if metrics are not supported. This is because it acts as the common point of access for user code that reports metrics. It should not be used within setCurrentContainer(), because the first container being set will have a null previous-current-container, which will cause the error to be incorrectly logged. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e864ac21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e864ac21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e864ac21 Branch: refs/heads/release-0.4.0-incubating Commit: e864ac2141e31a930b85264506d06579ba6811de Parents: 5dcffe0 Author: bchambers Authored: Wed Dec 14 11:23:39 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 15 11:12:00 2016 -0800 -- .../main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e864ac21/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java index 7c06cbf..5d7cb0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -56,7 +56,7 @@ public class MetricsEnvironment { */ @Nullable public static MetricsContainer setCurrentContainer(@Nullable MetricsContainer container) { -MetricsContainer previous = getCurrentContainer(); +MetricsContainer previous = CONTAINER_FOR_THREAD.get(); if (container == null) { CONTAINER_FOR_THREAD.remove(); } else {
[1/2] incubator-beam git commit: [BEAM-1033] Retry Bigquery Verifier when Query Fails
Repository: incubator-beam Updated Branches: refs/heads/master 4927cc1ab -> 0b0a1b797 [BEAM-1033] Retry Bigquery Verifier when Query Fails Update Junit to 4.12 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b626f0e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b626f0e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b626f0e6 Branch: refs/heads/master Commit: b626f0e627af85b2aa01213587b4130932030166 Parents: 4927cc1 Author: Mark Liu Authored: Wed Nov 30 22:20:12 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 14 12:44:47 2016 -0800 -- pom.xml | 2 +- .../beam/sdk/testing/BigqueryMatcher.java | 48 .../beam/sdk/testing/BigqueryMatcherTest.java | 82 ++-- .../main/resources/archetype-resources/pom.xml | 2 +- .../main/resources/archetype-resources/pom.xml | 2 +- 5 files changed, 76 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/pom.xml -- diff --git a/pom.xml b/pom.xml index 4faa971..970547d 100644 --- a/pom.xml +++ b/pom.xml @@ -123,7 +123,7 @@ 2.7.2 3.0.1 2.4 -4.11 +4.12 1.9.5 4.1.3.Final 1.4.0.Final http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java index 9b8589a..8f752c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -39,6 +39,7 @@ import com.google.common.collect.Lists; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; @@ -117,20 +118,23 @@ public class BigqueryMatcher extends TypeSafeMatcher response = queryWithRetries( bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff()); -} catch (Exception e) { +} catch (IOException | InterruptedException e) { + if (e instanceof InterruptedIOException) { +Thread.currentThread().interrupt(); + } throw new RuntimeException("Failed to fetch BigQuery data.", e); } -// validate BigQuery response -if (response == null || response.getRows() == null || response.getRows().isEmpty()) { +if (!response.getJobComplete()) { + // query job not complete, verification failed return false; -} - -// compute checksum -actualChecksum = generateHash(response.getRows()); -LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum); +} else { + // compute checksum + actualChecksum = generateHash(response.getRows()); + LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum); -return expectedChecksum.equals(actualChecksum); + return expectedChecksum.equals(actualChecksum); +} } @VisibleForTesting @@ -144,23 +148,35 @@ public class BigqueryMatcher extends TypeSafeMatcher .build(); } + @Nonnull @VisibleForTesting QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent, Sleeper sleeper, BackOff backOff) throws IOException, InterruptedException { IOException lastException = null; do { + if (lastException != null) { +LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException); + } try { -return bigqueryClient.jobs().query(projectId, queryContent).execute(); +QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute(); +if (response != null) { + return response; +} else { + lastException = + new IOException("Expected valid response from query job, but received null."); +} } catch (IOException e) { // ignore and retry -LOG.warn("Ignore the error and retry the query."); lastException = e; } } while(BackOffUtils.next(sleeper, backOff)); -throw new IOException( + +throw new RuntimeException( String.format( -"Unable to get BigQuery response after retrying %d times", MAX_QUERY_RETRIES), +
[2/2] incubator-beam git commit: This closes #1479
This closes #1479 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b0a1b79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b0a1b79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b0a1b79 Branch: refs/heads/master Commit: 0b0a1b79794598478033105e0582650fe17953ac Parents: 4927cc1 b626f0e Author: Thomas Groh Authored: Wed Dec 14 12:45:09 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 14 12:45:09 2016 -0800 -- pom.xml | 2 +- .../beam/sdk/testing/BigqueryMatcher.java | 48 .../beam/sdk/testing/BigqueryMatcherTest.java | 82 ++-- .../main/resources/archetype-resources/pom.xml | 2 +- .../main/resources/archetype-resources/pom.xml | 2 +- 5 files changed, 76 insertions(+), 60 deletions(-) --
[1/2] incubator-beam git commit: This closes #1583
Repository: incubator-beam Updated Branches: refs/heads/master 5a51ace8d -> fdf07318f This closes #1583 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fdf07318 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fdf07318 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fdf07318 Branch: refs/heads/master Commit: fdf07318f6833bad4634b59e3676033288f0c4aa Parents: 5a51ace 4cbccee Author: Thomas Groh Authored: Wed Dec 14 11:29:29 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 14 11:29:29 2016 -0800 -- .../beam/runners/direct/DirectGroupByKey.java | 36 +++- 1 file changed, 20 insertions(+), 16 deletions(-) --
[2/2] incubator-beam git commit: Implement GetDefaultOutputCoder in DirectGroupByKey
Implement GetDefaultOutputCoder in DirectGroupByKey This uses the standard Coder Inference path to set coders, rather than explicitly setting the output coders for intermediate PCollections. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4cbccee8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4cbccee8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4cbccee8 Branch: refs/heads/master Commit: 4cbccee8ee9a3b4235c6338fe49efc1f8a079812 Parents: 5a51ace Author: Thomas Groh Authored: Mon Dec 12 13:55:49 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 14 11:29:29 2016 -0800 -- .../beam/runners/direct/DirectGroupByKey.java | 36 +++- 1 file changed, 20 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4cbccee8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 405d913..6c10bd2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -46,9 +47,6 @@ class DirectGroupByKey @Override public PCollection>> expand(PCollection> input) { -@SuppressWarnings("unchecked") -KvCoder inputCoder = (KvCoder) input.getCoder(); - // This operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the @@ -61,19 +59,11 @@ class DirectGroupByKey // By default, implement GroupByKey via a series of lower-level operations. return input .apply(new DirectGroupByKeyOnly()) -.setCoder( -KeyedWorkItemCoder.of( -inputCoder.getKeyCoder(), -inputCoder.getValueCoder(), -inputWindowingStrategy.getWindowFn().windowCoder())) // Group each key's values by window, merging windows as needed. .apply( "GroupAlsoByWindow", -new DirectGroupAlsoByWindow(inputWindowingStrategy, outputWindowingStrategy)) - -.setCoder( -KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder(; +new DirectGroupAlsoByWindow(inputWindowingStrategy, outputWindowingStrategy)); } static final class DirectGroupByKeyOnly @@ -85,6 +75,16 @@ class DirectGroupByKey } DirectGroupByKeyOnly() {} + +@Override +protected Coder getDefaultOutputCoder( +@SuppressWarnings("unused") PCollection> input) +throws CannotProvideCoderException { + return KeyedWorkItemCoder.of( + GroupByKey.getKeyCoder(input.getCoder()), + GroupByKey.getInputValueCoder(input.getCoder()), + input.getWindowingStrategy().getWindowFn().windowCoder()); +} } static final class DirectGroupAlsoByWindow @@ -117,15 +117,19 @@ class DirectGroupByKey return kvCoder; } -public Coder getKeyCoder(Coder> inputCoder) { - return getKeyedWorkItemCoder(inputCoder).getKeyCoder(); -} - public Coder getValueCoder(Coder> inputCoder) { return getKeyedWorkItemCoder(inputCoder).getElementCoder(); } @Override +protected Coder getDefaultOutputCoder( +@SuppressWarnings("unused") PCollection> input) +throws CannotProvideCoderException { + KeyedWorkItemCoder inputCoder = getKeyedWorkItemCoder(input.getCoder()); + return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder())); +} + +@Override public PCollection>> expand(PCollection> input) { return PCollection.createPrimitiveOutputInternal( input.getPipeline(), outputWindowingStrategy, input.isBounded());
[2/3] incubator-beam git commit: Add Tests for Kryo Serialization of URFBS
Add Tests for Kryo Serialization of URFBS Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47cc2dca Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47cc2dca Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47cc2dca Branch: refs/heads/master Commit: 47cc2dca05daa4075093c414e13bf0cacaa77744 Parents: 3c2e550 Author: Thomas Groh Authored: Mon Dec 12 16:33:53 2016 -0800 Committer: Thomas Groh Committed: Mon Dec 12 18:17:08 2016 -0800 -- runners/core-java/pom.xml | 7 ++ .../UnboundedReadFromBoundedSourceTest.java | 97 ++-- 2 files changed, 94 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/pom.xml -- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 0e2b4b0..bab9d57 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -152,6 +152,13 @@ + + com.esotericsoftware.kryo + kryo + 2.21 + test + + org.apache.beam http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index 7fd8807..8a1b70b 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -17,19 +17,28 @@ */ package org.apache.beam.runners.core; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; @@ -44,11 +53,13 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Distinct; @@ -65,6 +76,7 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.objenesis.strategy.StdInstantiatorStrategy; /** * Unit tests for {@link UnboundedReadFromBoundedSource}. @@ -101,28 +113,93 @@ public class UnboundedReadFromBoundedSourceTest { PCollection output = p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements)); - // Count == numElements PAssert - .thatSingleton(output.apply("Count", Count.globally())) - .isEqualTo(numElements); +.thatSingleton(output.apply("Count", Count.globally())) +.isEqualTo(numElements); // Unique count == numElements PAssert - .thatSingleton(output.apply(Distinct.create()) - .apply("UniqueCount", Count.globally())) - .isEqualTo(numElements); +.thatSingleton(output.apply(Distinct.create()) +.apply("UniqueCount", Count.globally())) +.isEqualTo(numElements); // Min == 0 PAssert - .thatSingleton(output.apply("Min", Min.globally())) - .isEqualTo(0L); +.thatSingleton(output.apply("Min", Min.globally())) +.isEqual
[3/3] incubator-beam git commit: Add no-arg constructor for UnboundedReadFromBoundedSource
Add no-arg constructor for UnboundedReadFromBoundedSource This allows Kryo to work with the type, currently required by the Apex runner. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3c2e550a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3c2e550a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3c2e550a Branch: refs/heads/master Commit: 3c2e550aa7a56fe16f278cb353314a1ee51dbfe3 Parents: d9657ff Author: Kenneth Knowles Authored: Mon Dec 12 14:59:36 2016 -0800 Committer: Thomas Groh Committed: Mon Dec 12 18:17:08 2016 -0800 -- .../beam/runners/core/UnboundedReadFromBoundedSource.java | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c2e550a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index f3f93e1..c804725 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -118,7 +118,12 @@ public class UnboundedReadFromBoundedSource extends PTransform extends UnboundedSource> { -private BoundedSource boundedSource; +@SuppressWarnings("unused") // for Kryo +private BoundedToUnboundedSourceAdapter() { + this.boundedSource = null; +} + +private final BoundedSource boundedSource; public BoundedToUnboundedSourceAdapter(BoundedSource boundedSource) { this.boundedSource = boundedSource;
[1/3] incubator-beam git commit: This closes #1586
Repository: incubator-beam Updated Branches: refs/heads/master d9657ffc3 -> 91cc606b4 This closes #1586 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/91cc606b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/91cc606b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/91cc606b Branch: refs/heads/master Commit: 91cc606b469b1166b2a2d41425eab63a4795b1e0 Parents: d9657ff 47cc2dc Author: Thomas Groh Authored: Mon Dec 12 18:17:08 2016 -0800 Committer: Thomas Groh Committed: Mon Dec 12 18:17:08 2016 -0800 -- runners/core-java/pom.xml | 7 ++ .../core/UnboundedReadFromBoundedSource.java| 7 +- .../UnboundedReadFromBoundedSourceTest.java | 97 ++-- 3 files changed, 100 insertions(+), 11 deletions(-) --
[GitHub] incubator-beam pull request #1586: [BEAM-1139] Add no-arg constructor for Un...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1586 [BEAM-1139] Add no-arg constructor for UnboundedReadFromBoundedSource Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This allows Kryo to work with the type, currently required by the Apex runner. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam take_over_1585 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1586 commit e88e9edfcca65066d3f40d0fa49ba7386e09652f Author: Kenneth Knowles Date: 2016-12-12T22:59:36Z Add no-arg constructor for UnboundedReadFromBoundedSource This allows Kryo to work with the type, currently required by the Apex runner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1583: Implement GetDefaultOutputCoder in Direct...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1583 Implement GetDefaultOutputCoder in DirectGroupByKey Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This uses the standard Coder Inference path to set coders, rather than explicitly setting the output coders for intermediate PCollections. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam coder_inference_direct_runner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1583.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1583 commit 06796b2396ea5d56cd4c75e82a7f13a2d5ad52aa Author: Thomas Groh Date: 2016-12-12T21:55:49Z Implement GetDefaultOutputCoder in DirectGroupByKey This uses the standard Coder Inference path to set coders, rather than explicitly setting the output coders for intermediate PCollections. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Stop expanding PValues in DirectRunner visitors
Stop expanding PValues in DirectRunner visitors A PValue always expands to itself, and these calls are unneccessary. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ef74a74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ef74a74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ef74a74 Branch: refs/heads/master Commit: 8ef74a744327c40fbb05030fd7657db8a865cb94 Parents: 5d619e8 Author: Thomas Groh Authored: Fri Dec 9 15:52:15 2016 -0800 Committer: Thomas Groh Committed: Mon Dec 12 13:51:29 2016 -0800 -- .../beam/runners/direct/DirectGraphVisitor.java | 14 ++ .../runners/direct/KeyedPValueTrackingVisitor.java| 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ef74a74/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 4f38bce..0283d03 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -99,14 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { if (!producers.containsKey(value)) { producers.put(value, appliedTransform); } -for (PValue expandedValue : value.expand()) { - if (expandedValue instanceof PCollectionView) { -views.add((PCollectionView) expandedValue); - } - if (!producers.containsKey(expandedValue)) { -producers.put(value, appliedTransform); - } -} + if (value instanceof PCollectionView) { + views.add((PCollectionView) value); + } + if (!producers.containsKey(value)) { + producers.put(value, appliedTransform); + } } private AppliedPTransform getAppliedTransform(TransformHierarchy.Node node) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ef74a74/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 4161f9e..7f85169 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -84,7 +84,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { if (producesKeyedOutputs.contains(producer.getTransform().getClass())) { - keyedValues.addAll(value.expand()); + keyedValues.add(value); } }
[1/2] incubator-beam git commit: This closes #1570
Repository: incubator-beam Updated Branches: refs/heads/master 5d619e8e3 -> 59f1fb26a This closes #1570 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59f1fb26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59f1fb26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59f1fb26 Branch: refs/heads/master Commit: 59f1fb26a900b4e4cbdb4d16b94cfe646b1c5f7e Parents: 5d619e8 8ef74a7 Author: Thomas Groh Authored: Mon Dec 12 13:51:29 2016 -0800 Committer: Thomas Groh Committed: Mon Dec 12 13:51:29 2016 -0800 -- .../beam/runners/direct/DirectGraphVisitor.java | 14 ++ .../runners/direct/KeyedPValueTrackingVisitor.java| 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) --
[GitHub] incubator-beam pull request #1582: [BEAM-646] Add Parameters to finishSpecif...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1582 [BEAM-646] Add Parameters to finishSpecifying Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This removes use of getProducingTransformInternal() in TypedPValue. Ensure that all nodes are finished specifying before a call to `Pipeline#traverseTopologically` or `PipelineRunner#run`. This ensures that all nodes are fully specified without requiring the `PipelineRunner` to do so explicitly. Use Coder Inference rather than explicitly setting Coders within DirectRunner overrides. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam parametered_finish_specifying Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1582.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1582 commit 05d3151de529e1db8ffe0e16ea03e4d2674b4696 Author: Thomas Groh Date: 2016-12-08T22:33:36Z Improve Coder Inference within DirectRunner Overrides This cleans up how coders are provided for DirectRunner overrides. Ensure that coders are provided within tests that use an empty Flatten. commit 97d81855b7dd54b0362121fe2b7d92aafb7d6594 Author: Thomas Groh Date: 2016-12-08T17:09:33Z Add Parameters to finishSpecifying Remove the need to use getProducingTransformInternal in TypedPValue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Migrate AppliedPTransform to use AutoValue
Migrate AppliedPTransform to use AutoValue Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8f52b5e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8f52b5e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8f52b5e1 Branch: refs/heads/master Commit: 8f52b5e1214061a9cbd2bd2c44f68c22102d9737 Parents: 98543e9 Author: Thomas Groh Authored: Fri Dec 9 15:34:25 2016 -0800 Committer: Thomas Groh Committed: Mon Dec 12 13:23:52 2016 -0800 -- .../beam/sdk/transforms/AppliedPTransform.java | 75 1 file changed, 15 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f52b5e1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java index d80c116..77de54a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java @@ -17,8 +17,7 @@ */ package org.apache.beam.sdk.transforms; -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; +import com.google.auto.value.AutoValue; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; @@ -32,70 +31,26 @@ import org.apache.beam.sdk.values.POutput; * @param transform output type * @param transform type */ -public class AppliedPTransform +@AutoValue +public abstract class AppliedPTransform > { - private final String fullName; - private final InputT input; - private final OutputT output; - private final TransformT transform; - - private AppliedPTransform(String fullName, InputT input, OutputT output, TransformT transform) { -this.input = input; -this.output = output; -this.transform = transform; -this.fullName = fullName; + public static < + InputT extends PInput, + OutputT extends POutput, + TransformT extends PTransform> + AppliedPTransform of( + String fullName, InputT input, OutputT output, TransformT transform) { +return new AutoValue_AppliedPTransform( +fullName, input, output, transform); } - public static > - AppliedPTransform of( - String fullName, InputT input, OutputT output, TransformT transform) { -return new AppliedPTransform(fullName, input, output, transform); - } + public abstract String getFullName(); - public String getFullName() { -return fullName; - } + public abstract InputT getInput(); - public InputT getInput() { -return input; - } + public abstract OutputT getOutput(); - public OutputT getOutput() { -return output; - } - - public TransformT getTransform() { -return transform; - } - - @Override - public int hashCode() { -return Objects.hashCode(getFullName(), getInput(), getOutput(), getTransform()); - } - - @Override - public boolean equals(Object other) { -if (other instanceof AppliedPTransform) { - AppliedPTransform that = (AppliedPTransform) other; - return Objects.equal(this.getFullName(), that.getFullName()) - && Objects.equal(this.getInput(), that.getInput()) - && Objects.equal(this.getOutput(), that.getOutput()) - && Objects.equal(this.getTransform(), that.getTransform()); -} else { - return false; -} - } - - @Override - public String toString() { -return MoreObjects.toStringHelper(getClass()) -.add("fullName", getFullName()) -.add("input", getInput()) -.add("output", getOutput()) -.add("transform", getTransform()) -.toString(); - } + public abstract TransformT getTransform(); }
[1/2] incubator-beam git commit: This closes #1568
Repository: incubator-beam Updated Branches: refs/heads/master 98543e96f -> 5d619e8e3 This closes #1568 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5d619e8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5d619e8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5d619e8e Branch: refs/heads/master Commit: 5d619e8e305f5da259ab128d67a06e59ea618145 Parents: 98543e9 8f52b5e Author: Thomas Groh Authored: Mon Dec 12 13:23:52 2016 -0800 Committer: Thomas Groh Committed: Mon Dec 12 13:23:52 2016 -0800 -- .../beam/sdk/transforms/AppliedPTransform.java | 75 1 file changed, 15 insertions(+), 60 deletions(-) --
[GitHub] incubator-beam pull request #1570: Stop expanding PValues in DirectRunner vi...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1570 Stop expanding PValues in DirectRunner visitors Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- A PValue always expands to itself, and these calls are unneccessary. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam rm_unneeded_expands Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1570 commit 314e3da822ba59f8ac7133cbf47c61fe429a7361 Author: Thomas Groh Date: 2016-12-09T23:52:15Z Stop expanding PValues in DirectRunner visitors A PValue always expands to itself, and these calls are unneccessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1569: [BEAM-646] Provide local tags in PInput, ...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1569 [BEAM-646] Provide local tags in PInput, POutput expansions Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Output an ordered colleciton in PInput and POutput expansions. This provides information that is necessary to reconstruct a PInput or POutput from its expansion. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam tagged_pvalues_expansions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1569.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1569 commit f6d826115cdc4ede1a3c5bc8fc86aac3118f2902 Author: Thomas Groh Date: 2016-12-06T19:03:52Z Provide local tags in PInput, POutput expansions Output an ordered colleciton in PInput and POutput expansions. This provides information that is necessary to reconstruct a PInput or POutput from its expansion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1568: Migrate AppliedPTransform to use AutoValu...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1568 Migrate AppliedPTransform to use AutoValue Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- R: @kennknowles You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam autovalue_applied_pt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1568 commit 615605665b5f421e870c1fa8c0e37b9e4850fd90 Author: Thomas Groh Date: 2016-12-09T23:34:25Z Migrate AppliedPTransform to use AutoValue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/4] incubator-beam-site git commit: [BEAM-835] Added IntelliJ instructions to the contribution guide
Repository: incubator-beam-site Updated Branches: refs/heads/asf-site 37adeb9ba -> 59f789b97 [BEAM-835] Added IntelliJ instructions to the contribution guide Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/33963196 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/33963196 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/33963196 Branch: refs/heads/asf-site Commit: 3396319690ab8a9e0bd1ce53cfd3a5ef78d40a3e Parents: 37adeb9 Author: minudika Authored: Fri Nov 25 18:08:51 2016 +0530 Committer: Thomas Groh Committed: Fri Dec 9 15:16:08 2016 -0800 -- src/contribute/contribution-guide.md | 17 +++-- 1 file changed, 15 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/33963196/src/contribute/contribution-guide.md -- diff --git a/src/contribute/contribution-guide.md b/src/contribute/contribution-guide.md index 1db04cc..3eb670f 100644 --- a/src/contribute/contribution-guide.md +++ b/src/contribute/contribution-guide.md @@ -97,8 +97,21 @@ Depending on your preferred development environment, you may need to prepare it # IntelliJ -_This documentation is incomplete. If you are an Intellij user, please contribute instructions! -See [BEAM-835](https://issues.apache.org/jira/browse/BEAM-835)._ +## Enable Annotation Processing + +To configure annotation processing in InteliJ Idea, + +1. Go to File -> Settings +Settings dialog window will be appeared. +2. Select Execution,Deployment,Deployment -> Compiler -> Annotation processors +3. Select, + * Enable annotation processing + * Obtain processors from project classpath + * Store generated sources relative to : _Module content root_ +4. Provide, + * Production sources directory : _target/generated-sources/annotations_ + * Test sources directory : _target/generated-test-sources/test-annotations_ +5. Finally click on apply and you are good to go. ## Checkstyle IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin.
[2/4] incubator-beam-site git commit: Reword IntelliJ Annotation Processor section
Reword IntelliJ Annotation Processor section Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/01ec343e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/01ec343e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/01ec343e Branch: refs/heads/asf-site Commit: 01ec343e58a8bef3d00e80525f252ef4896adede Parents: 3396319 Author: Thomas Groh Authored: Fri Dec 9 15:01:59 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 9 15:16:09 2016 -0800 -- src/contribute/contribution-guide.md | 25 +++-- 1 file changed, 11 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/01ec343e/src/contribute/contribution-guide.md -- diff --git a/src/contribute/contribution-guide.md b/src/contribute/contribution-guide.md index 3eb670f..6be6bf3 100644 --- a/src/contribute/contribution-guide.md +++ b/src/contribute/contribution-guide.md @@ -98,20 +98,17 @@ Depending on your preferred development environment, you may need to prepare it # IntelliJ ## Enable Annotation Processing - -To configure annotation processing in InteliJ Idea, - -1. Go to File -> Settings -Settings dialog window will be appeared. -2. Select Execution,Deployment,Deployment -> Compiler -> Annotation processors -3. Select, - * Enable annotation processing - * Obtain processors from project classpath - * Store generated sources relative to : _Module content root_ -4. Provide, - * Production sources directory : _target/generated-sources/annotations_ - * Test sources directory : _target/generated-test-sources/test-annotations_ -5. Finally click on apply and you are good to go. +To configure annotation processing in IntelliJ: + +1. Open Annotation Processors Settings dialog box by going to Settings -> Build, Execution, Deployment -> Compiler -> Annotation Processors. +1. Select the following buttons: + * "Enable annotation processing" + * "Obtain processors from project classpath" + * "Store generated sources relative to: _Module content root_" +1. Set the generated source directories to be equal to the Maven directories: + * Set "Production sources directory:" to "target/generated-sources/annotations". + * Set "Test sources directory:" to "target/generated-test-sources/test-annotations". +1. Click "OK". ## Checkstyle IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin.
[3/4] incubator-beam-site git commit: Regenerate Content
Regenerate Content Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/c676efe7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/c676efe7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/c676efe7 Branch: refs/heads/asf-site Commit: c676efe7d7032a89eec19400628c0b40bb2b12ab Parents: 01ec343 Author: Thomas Groh Authored: Fri Dec 9 15:05:21 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 9 15:16:09 2016 -0800 -- .../contribute/contribution-guide/index.html| 23 ++-- 1 file changed, 21 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/c676efe7/content/contribute/contribution-guide/index.html -- diff --git a/content/contribute/contribution-guide/index.html b/content/contribute/contribution-guide/index.html index 87d5f74..7d7b7c9 100644 --- a/content/contribute/contribution-guide/index.html +++ b/content/contribute/contribution-guide/index.html @@ -161,6 +161,7 @@ Clone the repository locally [Optional] IDE Setup IntelliJ + Enable Annotation Processing Checkstyle @@ -301,8 +302,26 @@ $ cd incubator-beam IntelliJ -This documentation is incomplete. If you are an Intellij user, please contribute instructions! -See https://issues.apache.org/jira/browse/BEAM-835";>BEAM-835. +Enable Annotation Processing +To configure annotation processing in IntelliJ: + + + Open Annotation Processors Settings dialog box by going to Settings -> Build, Execution, Deployment -> Compiler -> Annotation Processors. + Select the following buttons: + + âEnable annotation processingâ + âObtain processors from project classpathâ + âStore generated sources relative to: Module content rootâ + + + Set the generated source directories to be equal to the Maven directories: + + Set âProduction sources directory:â to âtarget/generated-sources/annotationsâ. + Set âTest sources directory:â to âtarget/generated-test-sources/test-annotationsâ. + + + Click âOKâ. + Checkstyle IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin.
[4/4] incubator-beam-site git commit: This closes #95
This closes #95 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/59f789b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/59f789b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/59f789b9 Branch: refs/heads/asf-site Commit: 59f789b97d5c03552d571afee6eeb1e3ba12 Parents: 37adeb9 c676efe Author: Thomas Groh Authored: Fri Dec 9 15:16:20 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 9 15:16:20 2016 -0800 -- .../contribute/contribution-guide/index.html| 23 ++-- src/contribute/contribution-guide.md| 14 ++-- 2 files changed, 33 insertions(+), 4 deletions(-) --
[GitHub] incubator-beam pull request #1567: Copy Runner Profiles to Java8 Archetype P...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1567 Copy Runner Profiles to Java8 Archetype POM Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This enables Java8 examples to be run cross-runner You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam ohfouroh-rc1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1567.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1567 commit 65dd62cd9b1d6f7925ee60b73a5a3fac1e4b1bf1 Author: Thomas Groh Date: 2016-12-09T19:36:22Z Copy Runner Profiles to Java8 Archetype POM This enables Java8 examples to be run cross-runner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1563: Enable the DirectRunner by default in Exa...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1563 Enable the DirectRunner by default in Examples Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This ensures a runner will be on the classpath if no profile is specified. This matches the generated examples with the quickstart. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam ohfouroh-rc1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1563 commit f83370d69d193818e5be9ff613498f9f3296a658 Author: Thomas Groh Date: 2016-12-09T18:52:46Z Enable the DirectRunner by default in Examples Archetype This ensures a runner will be on the classpath if no profile is specified. This matches the generated examples with the quickstart. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1525
This closes #1525 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/40bd2760 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/40bd2760 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/40bd2760 Branch: refs/heads/master Commit: 40bd27602ebe2269ccaba2685addb8e5e3ba533e Parents: 409b5df bf1fba4 Author: Thomas Groh Authored: Thu Dec 8 17:17:56 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 8 17:17:56 2016 -0800 -- .../beam/runners/core/SplittableParDo.java | 9 ++- .../org/apache/beam/sdk/transforms/ParDo.java | 8 ++- .../apache/beam/sdk/transforms/ParDoTest.java | 6 +- .../beam/sdk/transforms/SplittableDoFnTest.java | 58 +++- 4 files changed, 73 insertions(+), 8 deletions(-) --
[1/2] incubator-beam git commit: Fix a bug in SplittableDoFn Checkpointing
Repository: incubator-beam Updated Branches: refs/heads/master 409b5dfcf -> 40bd27602 Fix a bug in SplittableDoFn Checkpointing Call checkpoint() only once if the SDF emits output several times per claim call. Calling checkpoint multiple times would clobber an existing checkpoint, and the second call would only ever return an empty residual, losing all of the initial residual. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf1fba45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf1fba45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf1fba45 Branch: refs/heads/master Commit: bf1fba450e6b5fd6c98d006b381472eee8db7b72 Parents: 409b5df Author: Eugene Kirpichov Authored: Tue Dec 6 18:00:03 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 8 17:16:30 2016 -0800 -- .../beam/runners/core/SplittableParDo.java | 9 ++- .../org/apache/beam/sdk/transforms/ParDo.java | 8 ++- .../apache/beam/sdk/transforms/ParDoTest.java | 6 +- .../beam/sdk/transforms/SplittableDoFnTest.java | 58 +++- 4 files changed, 73 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 0bf882b..8a9bfcd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -590,9 +590,14 @@ public class SplittableParDo } private void noteOutput() { - if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE) { + // Take the checkpoint only if it hasn't been taken yet, because: + // 1) otherwise we'd lose the previous checkpoint stored in residualRestrictionHolder + // 2) it's not allowed to checkpoint a RestrictionTracker twice, since the first call + // by definition already maximally narrows its restriction, so a second checkpoint would + // have produced a useless empty residual restriction anyway. + if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE && residualRestrictionHolder[0] == null) { // Request a checkpoint. The fn *may* produce more output, but hopefully not too much. -residualRestrictionHolder[0] = tracker.checkpoint(); +residualRestrictionHolder[0] = checkNotNull(tracker.checkpoint()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index e60c536..167f5fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -787,7 +787,9 @@ public class ParDo { @Override public PCollection expand(PCollection input) { checkArgument( - !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner"); + !isSplittable(getOldFn()), + "%s does not support Splittable DoFn", + input.getPipeline().getOptions().getRunner().getName()); validateWindowType(input, fn); return PCollection.createPrimitiveOutputInternal( input.getPipeline(), @@ -1044,7 +1046,9 @@ public class ParDo { @Override public PCollectionTuple expand(PCollection input) { checkArgument( - !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner"); + !isSplittable(getOldFn()), + "%s does not support Splittable DoFn", + input.getPipeline().getOptions().getRunner().getName()); validateWindowType(input, fn); PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 3b2..2d118e4 100644 --- a/sdks/java/core/src/test/java/org/apache/be
[GitHub] incubator-beam pull request #1547: [BEAM-646] Add PTransformOverrideFactory ...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1547 [BEAM-646] Add PTransformOverrideFactory to the Core SDK Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This migrates PTransformOverrideFactory from the DirectRunner to the Core SDK, as part of BEAM-646. Migrate all DirectRunner Override Factories to the new PTransformOverrideFactory. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam override_factory_in_core Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1547 commit ad7aa03e12694bceb29906d2bb9df1ce009a1df2 Author: Thomas Groh Date: 2016-12-06T00:01:57Z Add PTransformOverrideFactory to the Core SDK This migrates PTransformOverrideFactory from the DirectRunner to the Core SDK, as part of BEAM-646. Add getOriginalToReplacements to provide a mapping from the original outputs to replaced outputs. This enables all replaced nodes to be rewired to output the original output. Migrate all DirectRunner Override Factories to the new PTransformOverrideFactory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1442: [BEAM-646] Add Replacement Methods to Tra...
Github user tgroh closed the pull request at: https://github.com/apache/incubator-beam/pull/1442 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1511
This closes #1511 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5b31a369 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5b31a369 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5b31a369 Branch: refs/heads/master Commit: 5b31a369962907e257de8019fbf6cde4c615b1c0 Parents: ae52ec1 55d333b Author: Thomas Groh Authored: Wed Dec 7 09:14:38 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 7 09:14:38 2016 -0800 -- .../apex/translation/TranslationContext.java| 4 +-- .../beam/runners/direct/DirectGraphVisitor.java | 9 +++ .../direct/KeyedPValueTrackingVisitor.java | 2 +- .../FlinkBatchPipelineTranslator.java | 4 +-- .../FlinkStreamingPipelineTranslator.java | 7 ++ .../dataflow/DataflowPipelineTranslator.java| 3 +-- .../apache/beam/runners/spark/SparkRunner.java | 17 +++-- .../beam/sdk/runners/TransformHierarchy.java| 26 +++- .../sdk/runners/TransformHierarchyTest.java | 13 -- 9 files changed, 38 insertions(+), 47 deletions(-) --
[1/2] incubator-beam git commit: Only provide expanded Inputs and Outputs
Repository: incubator-beam Updated Branches: refs/heads/master ae52ec1bc -> 5b31a3699 Only provide expanded Inputs and Outputs This removes PInput and POutput from the immediate API Surface of TransformHierarchy.Node, and forces Pipeline Visitors to access only the expanded version of the output. This is part of the move towards the runner-agnostic representation of a graph. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55d333bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55d333bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55d333bf Branch: refs/heads/master Commit: 55d333bff68809ff1a9154491ace02d2d16e3b85 Parents: ae52ec1 Author: Thomas Groh Authored: Mon Dec 5 14:29:05 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 7 09:14:18 2016 -0800 -- .../apex/translation/TranslationContext.java| 4 +-- .../beam/runners/direct/DirectGraphVisitor.java | 9 +++ .../direct/KeyedPValueTrackingVisitor.java | 2 +- .../FlinkBatchPipelineTranslator.java | 4 +-- .../FlinkStreamingPipelineTranslator.java | 7 ++ .../dataflow/DataflowPipelineTranslator.java| 3 +-- .../apache/beam/runners/spark/SparkRunner.java | 17 +++-- .../beam/sdk/runners/TransformHierarchy.java| 26 +++- .../sdk/runners/TransformHierarchyTest.java | 13 -- 9 files changed, 38 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 259afbd..3bf01a8 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -35,7 +35,6 @@ import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.PCollection; @@ -72,8 +71,7 @@ class TranslationContext { } public void setCurrentTransform(TransformHierarchy.Node treeNode) { -this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), -treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); +this.currentTransform = treeNode.toAppliedPTransform(); } public ApexPipelineOptions getPipelineOptions() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index cd9d120..4f38bce 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -79,13 +79,13 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { -toFinalize.removeAll(node.getInput().expand()); +toFinalize.removeAll(node.getInputs()); AppliedPTransform appliedTransform = getAppliedTransform(node); stepNames.put(appliedTransform, genStepName()); -if (node.getInput().expand().isEmpty()) { +if (node.getInputs().isEmpty()) { rootTransforms.add(appliedTransform); } else { - for (PValue value : node.getInput().expand()) { + for (PValue value : node.getInputs()) { primitiveConsumers.put(value, appliedTransform); } } @@ -111,8 +111,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { private AppliedPTransform getAppliedTransform(TransformHierarchy.Node node) { @SuppressWarnings({"rawtypes", "unchecked"}) -AppliedPTransform application = AppliedPTransform.of( -node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); +AppliedPTransform application = node.toAppliedPTrans
[1/2] incubator-beam git commit: This closes #1527
Repository: incubator-beam Updated Branches: refs/heads/master 02bb8c375 -> 1526184ae This closes #1527 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1526184a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1526184a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1526184a Branch: refs/heads/master Commit: 1526184ae8be1f8ae6863f830653204157a584cd Parents: 02bb8c3 b2d7223 Author: Thomas Groh Authored: Wed Dec 7 08:51:02 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 7 08:51:02 2016 -0800 -- .../java/org/apache/beam/runners/core/DoFnRunner.java | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) --
[2/2] incubator-beam git commit: Port most of DoFnRunner Javadoc to new DoFn
Port most of DoFnRunner Javadoc to new DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2d72237 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2d72237 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2d72237 Branch: refs/heads/master Commit: b2d72237b592e1dcb5cca30f5cbc9a11d2890c0f Parents: 02bb8c3 Author: Kenneth Knowles Authored: Tue Dec 6 15:20:28 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 7 08:51:02 2016 -0800 -- .../java/org/apache/beam/runners/core/DoFnRunner.java | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2d72237/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index aac8e8f..501667e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -18,29 +18,29 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; /** - * An wrapper interface that represents the execution of a {@link OldDoFn}. + * An wrapper interface that represents the execution of a {@link DoFn}. */ public interface DoFnRunner { /** - * Prepares and calls {@link OldDoFn#startBundle}. + * Prepares and calls a {@link DoFn DoFn's} {@link DoFn.StartBundle @StartBundle} method. */ void startBundle(); /** - * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current - * element. + * Calls a {@link DoFn DoFn's} {@link DoFn.ProcessElement @ProcessElement} method with a + * {@link DoFn.ProcessContext} containing the provided element. */ void processElement(WindowedValue elem); /** - * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as - * flushing in-memory states. + * Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs + * additional tasks, such as flushing in-memory states. */ void finishBundle();
[2/4] incubator-beam git commit: Add DirectGraphs to DirectRunner Tests
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 076e0fb..eb4d0cd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -31,7 +31,6 @@ import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; @@ -63,7 +62,6 @@ import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -116,33 +114,8 @@ public class WatermarkManagerTest implements Serializable { PCollectionList preFlatten = PCollectionList.of(createdInts).and(intsToFlatten); flattened = preFlatten.apply("flattened", Flatten.pCollections()); -Collection> rootTransforms = -ImmutableList.>of( -createdInts.getProducingTransformInternal(), -intsToFlatten.getProducingTransformInternal()); - -Map>> consumers = new HashMap<>(); -consumers.put( -createdInts, -ImmutableList.>of(filtered.getProducingTransformInternal(), -keyed.getProducingTransformInternal(), flattened.getProducingTransformInternal())); -consumers.put( -filtered, -Collections.>singleton( -filteredTimesTwo.getProducingTransformInternal())); -consumers.put(filteredTimesTwo, Collections.>emptyList()); -consumers.put(keyed, Collections.>emptyList()); - -consumers.put( -intsToFlatten, -Collections.>singleton( -flattened.getProducingTransformInternal())); -consumers.put(flattened, Collections.>emptyList()); - clock = MockClock.fromInstant(new Instant(1000)); -DirectGraphVisitor visitor = new DirectGraphVisitor(); -p.traverseTopologically(visitor); -graph = visitor.getGraph(); +graph = DirectGraphs.getGraph(p); manager = WatermarkManager.create(clock, graph); bundleFactory = ImmutableListBundleFactory.create(); @@ -155,7 +128,7 @@ public class WatermarkManagerTest implements Serializable { @Test public void getWatermarkForUntouchedTransform() { TransformWatermarks watermarks = -manager.getWatermarks(createdInts.getProducingTransformInternal()); +manager.getWatermarks(graph.getProducer(createdInts)); assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); @@ -170,13 +143,13 @@ public class WatermarkManagerTest implements Serializable { CommittedBundle output = multiWindowedBundle(createdInts, 1); manager.updateWatermarks(null, TimerUpdate.empty(), -result(createdInts.getProducingTransformInternal(), +result(graph.getProducer(createdInts), null, Collections.>singleton(output)), new Instant(8000L)); manager.refreshAll(); TransformWatermarks updatedSourceWatermark = -manager.getWatermarks(createdInts.getProducingTransformInternal()); +manager.getWatermarks(graph.getProducer(createdInts)); assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new Instant(8000L))); } @@ -191,7 +164,7 @@ public class WatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), -result(intsToFlatten.getProducingTransformInternal(), +result(graph.getProducer(intsToFlatten), null, Collections.>singleton(secondPcollectionBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -199,7 +172,7 @@ public class WatermarkManagerTest implements Serializable { // We didn't do anything for the first source, so we shouldn't have progressed the watermark TransformWatermarks firstSourceWatermark = -manager.getWatermarks(createdInts.getProducingTransformInternal()); +manager.getWatermarks(graph.getProducer(createdInts)); assertThat( firstSourceWatermark.getOutputWatermark(), not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); @@ -207,14 +180,14 @@ public class WatermarkManagerTest implements Serializab
[1/4] incubator-beam git commit: Remove getProducingTransformInternal from DirectGraphVisitorTest
Repository: incubator-beam Updated Branches: refs/heads/master 077d9118d -> ca6ab6c68 Remove getProducingTransformInternal from DirectGraphVisitorTest Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec1eff38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec1eff38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec1eff38 Branch: refs/heads/master Commit: ec1eff387a711039801289c8f59c4240b1f1d007 Parents: d6c6ad3 Author: Thomas Groh Authored: Fri Dec 2 14:26:04 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 6 10:46:39 2016 -0800 -- .../runners/direct/DirectGraphVisitorTest.java | 70 +--- 1 file changed, 47 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec1eff38/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java index fb84de8..5ad278b 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -19,25 +19,34 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import com.google.common.collect.Iterables; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -48,7 +57,6 @@ import org.junit.runners.JUnit4; /** * Tests for {@link DirectGraphVisitor}. */ -// TODO: Replace uses of getProducing @RunWith(JUnit4.class) public class DirectGraphVisitorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -81,26 +89,36 @@ public class DirectGraphVisitorTest implements Serializable { @Test public void getRootTransformsContainsPBegins() { PCollection created = p.apply(Create.of("foo", "bar")); -PCollection counted = p.apply(CountingInput.upTo(1234L)); +PCollection counted = p.apply(Read.from(CountingSource.upTo(1234L))); PCollection unCounted = p.apply(CountingInput.unbounded()); p.traverseTopologically(visitor); +DirectGraph graph = visitor.getGraph(); +assertThat(graph.getRootTransforms(), hasSize(3)); +List> unapplied = new ArrayList<>(); assertThat( -visitor.getGraph().getRootTransforms(), +graph.getRootTransforms(), Matchers.>containsInAnyOrder( -created.getProducingTransformInternal(), -counted.getProducingTransformInternal(), -unCounted.getProducingTransformInternal())); +graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted))); +for (AppliedPTransform root : graph.getRootTransforms()) { + assertTrue(root.getInput() instanceof PBegin); + assertThat(root.getOutput(), Matchers.isOneOf(created, counted, unCounted)); +} } @Test public void getRootTransformsContainsEmptyFlatten() { -PCollection empty = -PCollectionList.empty(p).apply(Flatten.pCollections()); +FlattenPCollectionList flatten = Flatten.pCollections(); +PCollectionList emptyList = PCollectionList.empty(p); +PCollection empty = emptyList.apply(flatten); p.traverseTopologically(visitor); +DirectGraph graph = visitor.getGraph
[4/4] incubator-beam git commit: This closes #1495
This closes #1495 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca6ab6c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca6ab6c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca6ab6c6 Branch: refs/heads/master Commit: ca6ab6c68d807765ab8e80273dd9d6f99aeaaf4a Parents: 077d911 ec1eff3 Author: Thomas Groh Authored: Tue Dec 6 10:46:40 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 6 10:46:40 2016 -0800 -- .../direct/BoundedReadEvaluatorFactoryTest.java | 18 +- .../runners/direct/DirectGraphVisitorTest.java | 69 -- .../beam/runners/direct/DirectGraphs.java | 35 +++ .../runners/direct/EvaluationContextTest.java | 82 --- .../direct/FlattenEvaluatorFactoryTest.java | 15 +- .../direct/GroupByKeyEvaluatorFactoryTest.java | 2 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 3 +- .../ImmutabilityEnforcementFactoryTest.java | 2 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 3 +- .../StatefulParDoEvaluatorFactoryTest.java | 4 +- .../runners/direct/StepTransformResultTest.java | 2 +- .../direct/TestStreamEvaluatorFactoryTest.java | 14 +- .../runners/direct/TransformExecutorTest.java | 9 +- .../UnboundedReadEvaluatorFactoryTest.java | 24 +- .../direct/ViewEvaluatorFactoryTest.java| 4 +- .../direct/WatermarkCallbackExecutorTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java| 237 --- 17 files changed, 292 insertions(+), 237 deletions(-) --
[3/4] incubator-beam git commit: Add DirectGraphs to DirectRunner Tests
Add DirectGraphs to DirectRunner Tests Add getGraph(Pipeline) and getProducer(PValue), which use the DirectGraphVisitor and DirectGraph methods to provide access to the producing AppliedPTransform. Remove getProducingTransformInternal from everywhere except DirectGraphVisitorTest Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6c6ad37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6c6ad37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6c6ad37 Branch: refs/heads/master Commit: d6c6ad37149622e4d35af39727cdf774e6263d1e Parents: 077d911 Author: Thomas Groh Authored: Fri Dec 2 10:56:36 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 6 10:46:39 2016 -0800 -- .../direct/BoundedReadEvaluatorFactoryTest.java | 18 +- .../runners/direct/DirectGraphVisitorTest.java | 1 + .../beam/runners/direct/DirectGraphs.java | 35 +++ .../runners/direct/EvaluationContextTest.java | 82 --- .../direct/FlattenEvaluatorFactoryTest.java | 15 +- .../direct/GroupByKeyEvaluatorFactoryTest.java | 2 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 3 +- .../ImmutabilityEnforcementFactoryTest.java | 2 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 3 +- .../StatefulParDoEvaluatorFactoryTest.java | 4 +- .../runners/direct/StepTransformResultTest.java | 2 +- .../direct/TestStreamEvaluatorFactoryTest.java | 14 +- .../runners/direct/TransformExecutorTest.java | 9 +- .../UnboundedReadEvaluatorFactoryTest.java | 24 +- .../direct/ViewEvaluatorFactoryTest.java| 4 +- .../direct/WatermarkCallbackExecutorTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java| 237 --- 17 files changed, 246 insertions(+), 215 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index b1ff689..acb1444 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -80,6 +80,7 @@ public class BoundedReadEvaluatorFactoryTest { private BoundedReadEvaluatorFactory factory; @Mock private EvaluationContext context; private BundleFactory bundleFactory; + private AppliedPTransform longsProducer; @Before public void setup() { @@ -92,6 +93,7 @@ public class BoundedReadEvaluatorFactoryTest { new BoundedReadEvaluatorFactory( context, Long.MAX_VALUE /* minimum size for dynamic splits */); bundleFactory = ImmutableListBundleFactory.create(); +longsProducer = DirectGraphs.getProducer(longs); } @Test @@ -102,11 +104,11 @@ public class BoundedReadEvaluatorFactoryTest { Collection> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(context) -.getInitialInputs(longs.getProducingTransformInternal(), 1); +.getInitialInputs(longsProducer, 1); List> outputs = new ArrayList<>(); for (CommittedBundle shardBundle : initialInputs) { TransformEvaluator evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); + factory.forApplication(longsProducer, null); for (WindowedValue shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } @@ -141,7 +143,7 @@ public class BoundedReadEvaluatorFactoryTest { } PCollection read = TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); -AppliedPTransform transform = read.getProducingTransformInternal(); +AppliedPTransform transform = DirectGraphs.getProducer(read); Collection> unreadInputs = new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1); @@ -191,7 +193,7 @@ public class BoundedReadEvaluatorFactoryTest { PCollection read = TestPipeline.create() .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L; -AppliedPTransform transform = read.getProducingTransformInternal(); +AppliedPTransform transform = DirectGraphs.getProducer(read); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle())
[GitHub] incubator-beam pull request #1511: [BEAM-115] Only provide expanded Inputs a...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1511 [BEAM-115] Only provide expanded Inputs and Outputs Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This removes PInput and POutput from the API Surface of TransformHierarchy.Node, at least directly, and forces Pipeline Visitors to access only the expanded version of the output. This is part of the move towards the runner-agnostic representation of a graph. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam transform_hierarchy_always_expanded Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1511.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1511 commit aff19c18efb45747edca1b903f024a74c1939a34 Author: Thomas Groh Date: 2016-12-05T22:29:05Z Only provide expanded Inputs and Outputs This removes PInput and POutput from the API Surface of TransformHierarchy.Node, at least directly, and forces Pipeline Visitors to access only the expanded version of the output. This is part of the move towards the runner-agnostic representation of a graph. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1495: [BEAM-646] Add DirectTestUtils
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1495 [BEAM-646] Add DirectTestUtils Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Add getGraph(Pipeline) and getProducer(PValue), which use the DirectGraphVisitor and DirectGraph methods to provide access to the producing AppliedPTransform. Remove getProducingTransformInternal from everywhere except DirectGraphVisitorTest This removes all remaining uses of `PValue.getProducingTransformInternal` from the `DirectRunner` You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam producers_consumers_as_datastructure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1495 commit 5dc2386be460e004108aeaf89e0b5a5e3c3e50bd Author: Thomas Groh Date: 2016-12-02T18:56:36Z Add DirectTestUtils Add getGraph(Pipeline) and getProducer(PValue), which use the DirectGraphVisitor and DirectGraph methods to provide access to the producing AppliedPTransform. Remove getProducingTransformInternal from everywhere except DirectGraphVisitorTest commit a7863cfbdfc8d2289fccdf5499fd245928a11a7d Author: Thomas Groh Date: 2016-12-02T22:26:04Z Remove getProducingTransformInternal from DirectGraphVisitorTest --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] incubator-beam git commit: Rename ConsumerTrackingPipelineVisitor to DirectGraphVisitor
Rename ConsumerTrackingPipelineVisitor to DirectGraphVisitor Reduce visibility of Visitor. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/662416a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/662416a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/662416a4 Branch: refs/heads/master Commit: 662416a4e176cca252c0d6fde1bf4252aeaa56c0 Parents: 8162cd2 Author: Thomas Groh Authored: Fri Dec 2 10:07:05 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 2 14:02:25 2016 -0800 -- .../direct/ConsumerTrackingPipelineVisitor.java | 145 --- .../beam/runners/direct/DirectGraphVisitor.java | 145 +++ .../beam/runners/direct/DirectRunner.java | 8 +- .../ConsumerTrackingPipelineVisitorTest.java| 239 --- .../runners/direct/DirectGraphVisitorTest.java | 239 +++ .../runners/direct/EvaluationContextTest.java | 6 +- .../ImmutabilityCheckingBundleFactoryTest.java | 2 +- .../runners/direct/WatermarkManagerTest.java| 8 +- 8 files changed, 396 insertions(+), 396 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java deleted file mode 100644 index b9e77c5..000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; - -/** - * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the - * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume - * input after the upstream transform has produced and committed output. - */ -public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults { - private Map> producers = new HashMap<>(); - - private ListMultimap> primitiveConsumers = - ArrayListMultimap.create(); - - private Set> views = new HashSet<>(); - private Set> rootTransforms = new HashSet<>(); - private Map, String> stepNames = new HashMap<>(); - private Set toFinalize = new HashSet<>(); - private int numTransforms = 0; - private boolean finalized = false; - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { -checkState( -!finalized, -"Attempting to traverse a pipeline (node %s) with a %s " -+ "which has already visited a Pipeline and is finalized", -node.getFullName(), -ConsumerTrackingPipelineVisitor.class.getSimpleName()); -return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { -checkState( -!finalized, -"Attempting to traverse a pipeline (node %s) with a %s which is already finalized", -
[3/3] incubator-beam git commit: This closes #1487
This closes #1487 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1abbb900 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1abbb900 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1abbb900 Branch: refs/heads/master Commit: 1abbb9007e83fc64f1bb61ff4593f37c6c386545 Parents: 8cb2689 662416a Author: Thomas Groh Authored: Fri Dec 2 14:02:25 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 2 14:02:25 2016 -0800 -- .../direct/ConsumerTrackingPipelineVisitor.java | 173 --- .../apache/beam/runners/direct/DirectGraph.java | 89 ++ .../beam/runners/direct/DirectGraphVisitor.java | 145 ++ .../beam/runners/direct/DirectRunner.java | 35 +-- .../beam/runners/direct/EvaluationContext.java | 76 ++--- .../direct/ExecutorServiceParallelExecutor.java | 15 +- .../ImmutabilityCheckingBundleFactory.java | 21 +- .../beam/runners/direct/WatermarkManager.java | 50 ++-- .../ConsumerTrackingPipelineVisitorTest.java| 287 --- .../runners/direct/DirectGraphVisitorTest.java | 239 +++ .../runners/direct/EvaluationContextTest.java | 29 +- .../ImmutabilityCheckingBundleFactoryTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java| 23 +- 13 files changed, 575 insertions(+), 613 deletions(-) --
[1/3] incubator-beam git commit: Stop using Maps of Transforms in the DirectRunner
Repository: incubator-beam Updated Branches: refs/heads/master 8cb2689f8 -> 1abbb9007 Stop using Maps of Transforms in the DirectRunner Instead, add a "DirectGraph" class, which adds a layer of indirection to all lookup methods. Remove all remaining uses of getProducingTransformInternal, and instead use DirectGraph methods to obtain the producing transform. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8162cd29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8162cd29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8162cd29 Branch: refs/heads/master Commit: 8162cd29d97ef307b6fac588f453e4e39d70fca7 Parents: 8cb2689 Author: Thomas Groh Authored: Thu Dec 1 15:39:30 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 2 14:02:24 2016 -0800 -- .../direct/ConsumerTrackingPipelineVisitor.java | 108 +++ .../apache/beam/runners/direct/DirectGraph.java | 89 +++ .../beam/runners/direct/DirectRunner.java | 31 +++--- .../beam/runners/direct/EvaluationContext.java | 76 - .../direct/ExecutorServiceParallelExecutor.java | 15 +-- .../ImmutabilityCheckingBundleFactory.java | 21 ++-- .../beam/runners/direct/WatermarkManager.java | 50 - .../ConsumerTrackingPipelineVisitorTest.java| 98 + .../runners/direct/EvaluationContextTest.java | 25 ++--- .../ImmutabilityCheckingBundleFactoryTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java| 23 ++-- 11 files changed, 252 insertions(+), 290 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java index acfad16..b9e77c5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java @@ -19,8 +19,8 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; -import java.util.ArrayList; -import java.util.Collection; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; /** @@ -41,9 +42,13 @@ import org.apache.beam.sdk.values.PValue; * input after the upstream transform has produced and committed output. */ public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults { - private Map>> valueToConsumers = new HashMap<>(); - private Collection> rootTransforms = new ArrayList<>(); - private Collection> views = new ArrayList<>(); + private Map> producers = new HashMap<>(); + + private ListMultimap> primitiveConsumers = + ArrayListMultimap.create(); + + private Set> views = new HashSet<>(); + private Set> rootTransforms = new HashSet<>(); private Map, String> stepNames = new HashMap<>(); private Set toFinalize = new HashSet<>(); private int numTransforms = 0; @@ -81,81 +86,38 @@ public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults { rootTransforms.add(appliedTransform); } else { for (PValue value : node.getInput().expand()) { -valueToConsumers.get(value).add(appliedTransform); +primitiveConsumers.put(value, appliedTransform); } } } - private AppliedPTransform getAppliedTransform(TransformHierarchy.Node node) { -@SuppressWarnings({"rawtypes", "unchecked"}) -AppliedPTransform application = AppliedPTransform.of( -node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); -return application; - } - - @Override + @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { toFinalize.add(value); + +AppliedPTransform appliedTransform = getAppliedTransform(producer); +if (!producers.containsKey(value)) { + producers.put(value, appliedTransform); +} for (PValue expandedValue : value.expand()) { - valueToConsumers.put(expandedValue, new
[1/2] incubator-beam git commit: Explicitly Throw in TransformExecutorTest
Repository: incubator-beam Updated Branches: refs/heads/master 37e891fe9 -> 8cb2689f8 Explicitly Throw in TransformExecutorTest Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4ee8b73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4ee8b73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4ee8b73 Branch: refs/heads/master Commit: b4ee8b730bffb31ee1178303f1dbd5058eb22a11 Parents: 37e891f Author: Thomas Groh Authored: Fri Dec 2 10:56:15 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 2 13:58:38 2016 -0800 -- .../runners/direct/TransformExecutorTest.java | 184 ++- 1 file changed, 97 insertions(+), 87 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4ee8b73/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 85eff65..08b1e18 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -37,13 +37,10 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -63,7 +60,9 @@ import org.mockito.MockitoAnnotations; public class TransformExecutorTest { @Rule public ExpectedException thrown = ExpectedException.none(); private PCollection created; - private PCollection> downstream; + + private AppliedPTransform createdProducer; + private AppliedPTransform downstreamProducer; private CountDownLatch evaluatorCompleted; @@ -88,15 +87,17 @@ public class TransformExecutorTest { TestPipeline p = TestPipeline.create(); created = p.apply(Create.of("foo", "spam", "third")); -downstream = created.apply(WithKeys.of(3)); +PCollection> downstream = created.apply(WithKeys.of(3)); + +createdProducer = created.getProducingTransformInternal(); +downstreamProducer = downstream.getProducingTransformInternal(); when(evaluationContext.getMetrics()).thenReturn(metrics); } @Test public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception { -final TransformResult result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); +final TransformResult result = StepTransformResult.withoutHold(createdProducer).build(); final AtomicBoolean finishCalled = new AtomicBoolean(false); TransformEvaluator evaluator = new TransformEvaluator() { @@ -112,8 +113,7 @@ public class TransformExecutorTest { } }; -when(registry.forApplication(created.getProducingTransformInternal(), null)) -.thenReturn(evaluator); +when(registry.forApplication(createdProducer, null)).thenReturn(evaluator); TransformExecutor executor = TransformExecutor.create( @@ -121,7 +121,7 @@ public class TransformExecutorTest { registry, Collections.emptyList(), null, -created.getProducingTransformInternal(), +createdProducer, completionCallback, transformEvaluationState); executor.run(); @@ -133,7 +133,7 @@ public class TransformExecutorTest { @Test public void nullTransformEvaluatorTerminates() throws Exception { -when(registry.forApplication(created.getProducingTransformInternal(), null)).thenReturn(null); +when(registry.forApplication(createdProducer, null)).thenReturn(null); TransformExecutor executor = TransformExecutor.create( @@ -141,7 +141,7 @@ public class TransformExecutorTest { registry, Collections.emptyList(), null, -created.getProducingTransformInternal(), +createdProducer, completionCallback, transformEvaluationState); executo
[2/2] incubator-beam git commit: This closes #1490
This closes #1490 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cb2689f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cb2689f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cb2689f Branch: refs/heads/master Commit: 8cb2689f8952a73a4e855a03f98c1d5bec8181fb Parents: 37e891f b4ee8b7 Author: Thomas Groh Authored: Fri Dec 2 13:58:39 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 2 13:58:39 2016 -0800 -- .../runners/direct/TransformExecutorTest.java | 184 ++- 1 file changed, 97 insertions(+), 87 deletions(-) --
[2/3] incubator-beam git commit: Move Towards removing WindowedValue from SDK
Move Towards removing WindowedValue from SDK - Introduces ValueInSingleWindow for purposes of PAssert - Uses ValueInSingleWindow inside DoFnTester - Moves WindowMatchers{,Test} to runners-core After this commit, WindowedValue does not appear in any SDK APIs used by Pipeline authors. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9891234 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9891234 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9891234 Branch: refs/heads/master Commit: d989123424a54699ecb47ba6c0a4e437316cabce Parents: 0fb5610 Author: Eugene Kirpichov Authored: Mon Oct 31 15:46:25 2016 -0700 Committer: Thomas Groh Committed: Fri Dec 2 13:16:04 2016 -0800 -- .../beam/runners/core/ReduceFnRunnerTest.java | 5 +- .../beam/runners/core/SplittableParDoTest.java | 38 ++-- .../beam/runners/core/WindowMatchers.java | 204 +++ .../beam/runners/core/WindowMatchersTest.java | 82 .../direct/WindowEvaluatorFactoryTest.java | 4 +- .../apache/beam/sdk/testing/GatherAllPanes.java | 88 .../org/apache/beam/sdk/testing/PAssert.java| 77 +++ .../apache/beam/sdk/testing/PaneExtractors.java | 55 +++-- .../beam/sdk/testing/ValueInSingleWindow.java | 134 .../apache/beam/sdk/transforms/DoFnTester.java | 58 +++--- .../apache/beam/sdk/util/GatherAllPanes.java| 86 .../apache/beam/sdk/util/IdentityWindowFn.java | 2 +- .../org/apache/beam/sdk/WindowMatchers.java | 204 --- .../org/apache/beam/sdk/WindowMatchersTest.java | 82 .../beam/sdk/testing/GatherAllPanesTest.java| 140 + .../beam/sdk/testing/PaneExtractorsTest.java| 133 ++-- .../testing/ValueInSingleWindowCoderTest.java | 51 + .../beam/sdk/util/GatherAllPanesTest.java | 143 - 18 files changed, 893 insertions(+), 693 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 20eb08b..ba57567 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue; -import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; +import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; +import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; @@ -39,7 +39,6 @@ import com.google.common.collect.Iterables; import java.util.Iterator; import java.util.List; import org.apache.beam.runners.core.triggers.TriggerStateMachine; -import org.apache.beam.sdk.WindowMatchers; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 990d892..b13d839 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValueInSingleWindow; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; @@ -142,14 +143,15 @@ public class SplittableParDoTest { PCollection.IsBounded.BOUNDED, makeBoundedCollection(pipeline) .apply("bounded to bounded", new
[1/3] incubator-beam git commit: Move Towards removing WindowedValue from SDK
Repository: incubator-beam Updated Branches: refs/heads/master 0fb561068 -> a0884492a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java index ef501d4..7df2f89 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java @@ -23,10 +23,10 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.WindowedValue; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -34,32 +34,33 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link PaneExtractors}. - */ +/** Tests for {@link PaneExtractors}. */ @RunWith(JUnit4.class) public class PaneExtractorsTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void onlyPaneNoFiring() { -SerializableFunction>, Iterable> extractor = +SerializableFunction>, Iterable> extractor = PaneExtractors.onlyPane(); -Iterable> noFiring = +Iterable> noFiring = ImmutableList.of( -WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInEmptyWindows(19)); +ValueInSingleWindow.of( +9, BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), +ValueInSingleWindow.of( +19, BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); assertThat(extractor.apply(noFiring), containsInAnyOrder(9, 19)); } @Test public void onlyPaneOnlyOneFiring() { -SerializableFunction>, Iterable> extractor = +SerializableFunction>, Iterable> extractor = PaneExtractors.onlyPane(); -Iterable> onlyFiring = +Iterable> onlyFiring = ImmutableList.of( -WindowedValue.of( +ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING), -WindowedValue.of( +ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); assertThat(extractor.apply(onlyFiring), containsInAnyOrder(2, 1)); @@ -67,21 +68,21 @@ public class PaneExtractorsTest { @Test public void onlyPaneMultiplePanesFails() { -SerializableFunction>, Iterable> extractor = +SerializableFunction>, Iterable> extractor = PaneExtractors.onlyPane(); -Iterable> multipleFiring = +Iterable> multipleFiring = ImmutableList.of( -WindowedValue.of( +ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(true, false, Timing.EARLY)), -WindowedValue.of( +ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), -WindowedValue.of( +ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, @@ -94,16 +95,16 @@ public class PaneExtractorsTest { @Test public void onTimePane() { -SerializableFunction>, Iterable> extractor = +SerializableFunction>, Iterable> extractor = PaneExtractors.onTimePane(); -Iterable> onlyOnTime = +Iterable> onlyOnTime = ImmutableList.of( -WindowedValue.of( +ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), -WindowedValue.of( +ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, @@ -114,26 +115,26 @@ public class PaneExtractorsTest { @Test public void onTimePaneOnlyEarlyAndLate() { -SerializableFunction>, Iterable> extractor = +SerializableFunction>, Iterable> extractor = PaneExtractors.onTimePane(); -Iterable> onlyOnTime = +Iterable> onlyOnTime = Immutable
[3/3] incubator-beam git commit: This closes #1260
This closes #1260 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0884492 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0884492 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0884492 Branch: refs/heads/master Commit: a0884492a5c037a7fe58a1f011db560f9c228ae0 Parents: 0fb5610 d989123 Author: Thomas Groh Authored: Fri Dec 2 13:16:22 2016 -0800 Committer: Thomas Groh Committed: Fri Dec 2 13:16:22 2016 -0800 -- .../beam/runners/core/ReduceFnRunnerTest.java | 5 +- .../beam/runners/core/SplittableParDoTest.java | 38 ++-- .../beam/runners/core/WindowMatchers.java | 204 +++ .../beam/runners/core/WindowMatchersTest.java | 82 .../direct/WindowEvaluatorFactoryTest.java | 4 +- .../apache/beam/sdk/testing/GatherAllPanes.java | 88 .../org/apache/beam/sdk/testing/PAssert.java| 77 +++ .../apache/beam/sdk/testing/PaneExtractors.java | 55 +++-- .../beam/sdk/testing/ValueInSingleWindow.java | 134 .../apache/beam/sdk/transforms/DoFnTester.java | 58 +++--- .../apache/beam/sdk/util/GatherAllPanes.java| 86 .../apache/beam/sdk/util/IdentityWindowFn.java | 2 +- .../org/apache/beam/sdk/WindowMatchers.java | 204 --- .../org/apache/beam/sdk/WindowMatchersTest.java | 82 .../beam/sdk/testing/GatherAllPanesTest.java| 140 + .../beam/sdk/testing/PaneExtractorsTest.java| 133 ++-- .../testing/ValueInSingleWindowCoderTest.java | 51 + .../beam/sdk/util/GatherAllPanesTest.java | 143 - 18 files changed, 893 insertions(+), 693 deletions(-) --
[GitHub] incubator-beam pull request #1490: Explicitly Throw in TransformExecutorTest
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1490 Explicitly Throw in TransformExecutorTest Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Minor cleanup to stop using `ImmutabilityCheckingEnforcement` and instead to use one that throws when specified. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam explicitly_throw_transform_executor_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1490 commit 4971af757f98c7a8419834afd1b9df3bc13ff800 Author: Thomas Groh Date: 2016-12-02T18:56:15Z Explicitly Throw in TransformExecutorTest --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1487: Stop using Maps of Transforms in the Dire...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1487 Stop using Maps of Transforms in the DirectRunner Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Instead, add a "DirectGraph" class, which adds a layer of indirection to all lookup methods. Remove all remaining uses of getProducingTransformInternal, and instead use DirectGraph methods to obtain the producing transform. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam producers_consumers_as_datastructure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1487 commit 4f54fa66117f8f6e73827be68dc4b7000b9d1a90 Author: Thomas Groh Date: 2016-12-01T23:39:30Z Stop using Maps of Transforms in the DirectRunner Instead, add a "DirectGraph" class, which adds a layer of indirection to all lookup methods. Remove all remaining uses of getProducingTransformInternal, and instead use DirectGraph methods to obtain the producing transform. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/3] incubator-beam git commit: This closes #1484
Repository: incubator-beam Updated Branches: refs/heads/master 24fab9f53 -> 63491bf21 This closes #1484 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63491bf2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63491bf2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63491bf2 Branch: refs/heads/master Commit: 63491bf211d8b2f23f6b7db1375a733c4332b850 Parents: 24fab9f 7c5c791 Author: Thomas Groh Authored: Thu Dec 1 14:34:21 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 1 14:34:21 2016 -0800 -- .../translation/ApexPipelineTranslator.java | 12 +- .../apex/translation/TranslationContext.java| 6 +- .../direct/ConsumerTrackingPipelineVisitor.java | 12 +- .../runners/direct/DisplayDataValidator.java| 6 +- .../direct/KeyedPValueTrackingVisitor.java | 10 +- .../apache/beam/runners/flink/FlinkRunner.java | 12 +- .../FlinkBatchPipelineTranslator.java | 14 +- .../FlinkStreamingPipelineTranslator.java | 16 +- .../PipelineTranslationOptimizer.java | 10 +- .../dataflow/DataflowPipelineTranslator.java| 8 +- .../beam/runners/dataflow/DataflowRunner.java | 10 +- .../runners/dataflow/DataflowRunnerTest.java| 4 +- .../dataflow/RecordingPipelineVisitor.java | 6 +- .../apache/beam/runners/spark/SparkRunner.java | 21 +- .../beam/sdk/AggregatorPipelineExtractor.java | 6 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 17 +- .../beam/sdk/runners/TransformHierarchy.java| 244 +++- .../beam/sdk/runners/TransformTreeNode.java | 282 --- .../sdk/AggregatorPipelineExtractorTest.java| 20 +- .../sdk/runners/TransformHierarchyTest.java | 26 +- .../beam/sdk/runners/TransformTreeTest.java | 8 +- .../display/DisplayDataEvaluator.java | 8 +- 22 files changed, 344 insertions(+), 414 deletions(-) --
[3/3] incubator-beam git commit: Migrate TransformTreeNode to an Inner Class
Migrate TransformTreeNode to an Inner Class TransformTreeNode requires access to the hierarchy it is contained within, and generally cannot be separated from TransformHierarchy. It is primarily an implementation detail of TransformHierarchy, so can be relocated to within it. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/569e8d70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/569e8d70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/569e8d70 Branch: refs/heads/master Commit: 569e8d7085cf4e6effd379f23716202c6c5daf52 Parents: 24fab9f Author: Thomas Groh Authored: Thu Dec 1 13:19:14 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 1 14:34:21 2016 -0800 -- .../translation/ApexPipelineTranslator.java | 12 +- .../apex/translation/TranslationContext.java| 6 +- .../direct/ConsumerTrackingPipelineVisitor.java | 12 +- .../runners/direct/DisplayDataValidator.java| 6 +- .../direct/KeyedPValueTrackingVisitor.java | 10 +- .../apache/beam/runners/flink/FlinkRunner.java | 12 +- .../FlinkBatchPipelineTranslator.java | 14 +- .../FlinkStreamingPipelineTranslator.java | 16 +- .../PipelineTranslationOptimizer.java | 10 +- .../dataflow/DataflowPipelineTranslator.java| 8 +- .../beam/runners/dataflow/DataflowRunner.java | 10 +- .../runners/dataflow/DataflowRunnerTest.java| 4 +- .../dataflow/RecordingPipelineVisitor.java | 6 +- .../apache/beam/runners/spark/SparkRunner.java | 21 +- .../beam/sdk/AggregatorPipelineExtractor.java | 6 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 17 +- .../beam/sdk/runners/TransformHierarchy.java| 243 +++- .../beam/sdk/runners/TransformTreeNode.java | 282 --- .../sdk/AggregatorPipelineExtractorTest.java| 20 +- .../sdk/runners/TransformHierarchyTest.java | 26 +- .../beam/sdk/runners/TransformTreeTest.java | 8 +- .../display/DisplayDataEvaluator.java | 8 +- 22 files changed, 343 insertions(+), 414 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index d38faf7..8d6db84 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -19,17 +19,15 @@ package org.apache.beam.runners.apex.translation; import com.datatorrent.api.DAG; - import java.util.HashMap; import java.util.Map; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView; import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -84,18 +82,18 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { } @Override - public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { LOG.debug("entering composite transform {}", node.getTransform()); return CompositeBehavior.ENTER_TRANSFORM; } @Override - public void leaveCompositeTransform(TransformTreeNode node) { + public void leaveCompositeTransform(TransformHierarchy.Node node) { LOG.debug("leaving composite transform {}", node.getTransform()); } @Override - public void visitPrimitiveTransform(TransformTreeNode node) { + public void visitPrimitiveTransform(TransformHierarchy.Node node) { LOG.debug("visiting transform {}", node.getTransform()); PTransform transform = node.getTransform(); TransformTranslator translator = getTransformTranslator(transform.getClass()); @@ -108,7 +106,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { } @Override - public void visitValue(PValue value, TransformTreeNode producer) { + public void visitValue(PValue value,
[2/3] incubator-beam git commit: Reduce the visibility of TransformHierarchy Node Mutators
Reduce the visibility of TransformHierarchy Node Mutators These mutators should not be accessible when visiting the nodes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c5c7910 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c5c7910 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c5c7910 Branch: refs/heads/master Commit: 7c5c7910c0b8cba8623a1c49fc24c51ea691dac3 Parents: 569e8d7 Author: Thomas Groh Authored: Thu Dec 1 13:22:11 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 1 14:34:21 2016 -0800 -- .../java/org/apache/beam/sdk/runners/TransformHierarchy.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c5c7910/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 662acc1..e9829cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -263,7 +263,7 @@ public class TransformHierarchy { /** * Adds an output to the transform node. */ -public void setOutput(POutput output) { +private void setOutput(POutput output) { checkState(!finishedSpecifying); checkState( this.output == null, "Tried to specify more than one output for %s", getFullName()); @@ -304,13 +304,14 @@ public class TransformHierarchy { return AppliedPTransform.of( getFullName(), getInput(), getOutput(), (PTransform) getTransform()); } + /** * Visit the transform node. * * Provides an ordered visit of the input values, the primitive transform (or child nodes for * composite transforms), then the output values. */ -public void visit(PipelineVisitor visitor, Set visitedValues) { +private void visit(PipelineVisitor visitor, Set visitedValues) { if (!finishedSpecifying) { finishSpecifying(); } @@ -352,7 +353,7 @@ public class TransformHierarchy { * * All inputs are finished first, then the transform, then all outputs. */ -public void finishSpecifying() { +private void finishSpecifying() { if (finishedSpecifying) { return; }
[1/2] incubator-beam git commit: Improve Splittable DoFn
Repository: incubator-beam Updated Branches: refs/heads/master fd4b631f1 -> 24fab9f53 Improve Splittable DoFn Makes Splittable DoFn be more like a real DoFn: - Adds support for side inputs and outputs to SDF - Teaches `ProcessFn` to work with exploded windows inside the `KeyedWorkItem`. It works with them by un-exploding the windows in the `Iterable>` into a single `WindowedValue`, since the values and timestamps are guaranteed to be the same. Makes SplittableParDo.ProcessFn not use the (now unavailable) OldDoFn state and timers API: - Makes `ProcessFn` be a primitive transform with its own `ParDoEvaluator`. As a nice side effect, this enables the runner to provide additional hooks into it - e.g. for giving the runner access to the restriction tracker (in later PRs) - For consistency, moves declaration of `GBKIntoKeyedWorkItems` primitive transform into `SplittableParDo`, alongside the `SplittableProcessElements` transform - Preserves compressed representation of `WindowedValue`'s in `PushbackSideInputDoFnRunner` - Uses OutputWindowedValue in SplittableParDo.ProcessFn Proper lifecycle management for wrapped fn. - Caches underlying fn using DoFnLifecycleManager, so its @Setup and @Teardown methods are called. - Calls @StartBundle and @FinishBundle methods on the underlying fn explicitly. Output from them is prohibited, since an SDF is only allowed to output after a successful RestrictionTracker.tryClaim. It's possible that an SDF should not be allowed to have StartBundle/FinishBundle methods at all, but I'm not sure. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87ff5ac3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87ff5ac3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87ff5ac3 Branch: refs/heads/master Commit: 87ff5ac36bb9cc62fa4864ffa7b5a5e495b9a4a1 Parents: fd4b631 Author: Eugene Kirpichov Authored: Wed Oct 26 16:05:01 2016 -0700 Committer: Thomas Groh Committed: Thu Dec 1 14:15:55 2016 -0800 -- .../core/ElementAndRestrictionCoder.java| 8 + .../runners/core/GBKIntoKeyedWorkItems.java | 55 --- .../beam/runners/core/SplittableParDo.java | 378 +++ .../beam/runners/core/SplittableParDoTest.java | 134 +-- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 41 +- .../beam/runners/direct/DirectGroupByKey.java | 2 +- .../beam/runners/direct/DirectRunner.java | 8 +- .../runners/direct/DoFnLifecycleManager.java| 4 +- .../beam/runners/direct/ParDoEvaluator.java | 26 +- .../runners/direct/ParDoEvaluatorFactory.java | 63 +++- .../direct/ParDoMultiOverrideFactory.java | 2 +- ...littableProcessElementsEvaluatorFactory.java | 144 +++ .../direct/TransformEvaluatorRegistry.java | 5 + .../beam/runners/direct/SplittableDoFnTest.java | 194 +- .../org/apache/beam/sdk/transforms/DoFn.java| 12 + .../apache/beam/sdk/transforms/DoFnTester.java | 51 ++- .../sdk/util/state/TimerInternalsFactory.java | 36 ++ 17 files changed, 905 insertions(+), 258 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java index 6dec8e2..64c1e14 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -64,4 +64,12 @@ public class ElementAndRestrictionCoder RestrictionT value = restrictionCoder.decode(inStream, context); return ElementAndRestriction.of(key, value); } + + public Coder getElementCoder() { +return elementCoder; + } + + public Coder getRestrictionCoder() { +return restrictionCoder; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java deleted file mode 100644 index 304e349..000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more co
[2/2] incubator-beam git commit: This closes #1261
This closes #1261 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24fab9f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24fab9f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24fab9f5 Branch: refs/heads/master Commit: 24fab9f53a8b3a7ef5fb35195dbe9417bbcc4101 Parents: fd4b631 87ff5ac Author: Thomas Groh Authored: Thu Dec 1 14:16:58 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 1 14:16:58 2016 -0800 -- .../core/ElementAndRestrictionCoder.java| 8 + .../runners/core/GBKIntoKeyedWorkItems.java | 55 --- .../beam/runners/core/SplittableParDo.java | 378 +++ .../beam/runners/core/SplittableParDoTest.java | 134 +-- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 41 +- .../beam/runners/direct/DirectGroupByKey.java | 2 +- .../beam/runners/direct/DirectRunner.java | 8 +- .../runners/direct/DoFnLifecycleManager.java| 4 +- .../beam/runners/direct/ParDoEvaluator.java | 26 +- .../runners/direct/ParDoEvaluatorFactory.java | 63 +++- .../direct/ParDoMultiOverrideFactory.java | 2 +- ...littableProcessElementsEvaluatorFactory.java | 144 +++ .../direct/TransformEvaluatorRegistry.java | 5 + .../beam/runners/direct/SplittableDoFnTest.java | 194 +- .../org/apache/beam/sdk/transforms/DoFn.java| 12 + .../apache/beam/sdk/transforms/DoFnTester.java | 51 ++- .../sdk/util/state/TimerInternalsFactory.java | 36 ++ 17 files changed, 905 insertions(+), 258 deletions(-) --
[GitHub] incubator-beam pull request #1484: Migrate TransformTreeNode to an Inner Cla...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1484 Migrate TransformTreeNode to an Inner Class Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam transform_hierarchy_maintenance_internally Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1484 commit 1fab0da2bdfb7675fc3aa5f0faf81e2dcfe867fb Author: Thomas Groh Date: 2016-12-01T21:19:14Z Migrate TransformTreeNode to an Inner Class TransformTreeNode requires access to the hierarchy it is contained within, and generally cannot be separated from TransformHierarchy. It is primarily an implementation detail of TransformHierarchy, so can be relocated to within it. commit 3d44a4a0709fbe1ebb25d0dea7f927415fa370a1 Author: Thomas Groh Date: 2016-12-01T21:22:11Z Reduce the visibility of TransformHierarchy Node Mutators These mutators should not be accessible when visiting the nodes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Move TransformHierarchy Maintenance into it
Repository: incubator-beam Updated Branches: refs/heads/master 0c875ba70 -> 48130f718 Move TransformHierarchy Maintenance into it This reduces the complexity of Pipeline.applyInternal by keeping the responsiblities to passing a node into the Transform Hierarchy, enforcing name uniqueness, and causing the runner to expand the PTransform. This logic is moved to the appropriate application sites. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab1f1ad0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab1f1ad0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab1f1ad0 Branch: refs/heads/master Commit: ab1f1ad012bc559cdb099319a516e4437eed2825 Parents: 0c875ba Author: Thomas Groh Authored: Tue Nov 29 14:29:47 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 1 12:55:25 2016 -0800 -- .../direct/KeyedPValueTrackingVisitor.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++- .../beam/sdk/runners/TransformHierarchy.java| 126 - .../beam/sdk/runners/TransformTreeNode.java | 165 + .../sdk/runners/TransformHierarchyTest.java | 180 ++- .../beam/sdk/runners/TransformTreeTest.java | 4 +- 7 files changed, 340 insertions(+), 256 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 7c4376a..47b0857 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -74,7 +74,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { if (node.isRootNode()) { finalized = true; } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) { - keyedValues.addAll(node.getExpandedOutputs()); + keyedValues.addAll(node.getOutput().expand()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index c925454..95c7132 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -669,7 +669,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { PCollection input = p.begin() .apply(Create.of(1, 2, 3)); -thrown.expect(IllegalStateException.class); +thrown.expect(IllegalArgumentException.class); input.apply(new PartiallyBoundOutputCreator()); Assert.fail("Failure expected from use of partially bound output"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 9edf496..c8a4439 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk; +import static com.google.common.base.Preconditions.checkState; + import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.coders.CoderRegistry; @@ -31,7 +32,6 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transfo
[2/2] incubator-beam git commit: This closes #1469
This closes #1469 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/48130f71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/48130f71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/48130f71 Branch: refs/heads/master Commit: 48130f718d019d6928c464e6f7ad90cd510b62d2 Parents: 0c875ba ab1f1ad Author: Thomas Groh Authored: Thu Dec 1 12:55:26 2016 -0800 Committer: Thomas Groh Committed: Thu Dec 1 12:55:26 2016 -0800 -- .../direct/KeyedPValueTrackingVisitor.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++- .../beam/sdk/runners/TransformHierarchy.java| 126 - .../beam/sdk/runners/TransformTreeNode.java | 165 + .../sdk/runners/TransformHierarchyTest.java | 180 ++- .../beam/sdk/runners/TransformTreeTest.java | 4 +- 7 files changed, 340 insertions(+), 256 deletions(-) --
[2/2] incubator-beam git commit: Preserves compressed windows in PushbackSideInputDoFnRunner
Preserves compressed windows in PushbackSideInputDoFnRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38f0b11c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38f0b11c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38f0b11c Branch: refs/heads/master Commit: 38f0b11cc9028cf347e3c96b6e6116e5a5a9972d Parents: 565e99f Author: Eugene Kirpichov Authored: Wed Nov 30 14:28:51 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 30 16:26:33 2016 -0800 -- .../core/PushbackSideInputDoFnRunner.java | 20 .../core/PushbackSideInputDoFnRunnerTest.java | 18 +++--- 2 files changed, 27 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index 8c169da..460154d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -74,17 +74,29 @@ public class PushbackSideInputDoFnRunner implements DoFnRunner< processElement(elem); return Collections.emptyList(); } -ImmutableList.Builder> pushedBack = ImmutableList.builder(); +ImmutableList.Builder readyWindowsBuilder = ImmutableList.builder(); +ImmutableList.Builder pushedBackWindowsBuilder = ImmutableList.builder(); for (WindowedValue windowElem : elem.explodeWindows()) { BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); if (isReady(mainInputWindow)) { -processElement(windowElem); +readyWindowsBuilder.add(mainInputWindow); } else { notReadyWindows.add(mainInputWindow); -pushedBack.add(windowElem); +pushedBackWindowsBuilder.add(mainInputWindow); } } -return pushedBack.build(); +ImmutableList readyWindows = readyWindowsBuilder.build(); +ImmutableList pushedBackWindows = pushedBackWindowsBuilder.build(); +if (!readyWindows.isEmpty()) { + processElement( + WindowedValue.of( + elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane())); +} +return pushedBackWindows.isEmpty() +? ImmutableList.>of() +: ImmutableList.of( +WindowedValue.of( +elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane())); } private boolean isReady(BoundedWindow mainInputWindow) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java index 59a7c92..f8f4604 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.core; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; @@ -27,7 +27,6 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; - import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; @@ -131,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest { PaneInfo.ON_TIME_AND_ONLY_FIRING); Iterable> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); -assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); +assertThat(multiWindowPushback, contains(multiWindow)); assertThat(underlying.inputElems, Matchers.>emptyIterable()); } @@ -162,9 +161,14 @@ public class PushbackSideInputDoFnRunnerTest { assertThat( multiWindowPushback,
[1/2] incubator-beam git commit: This closes #1471
Repository: incubator-beam Updated Branches: refs/heads/master 565e99fbf -> a20bc4793 This closes #1471 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a20bc479 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a20bc479 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a20bc479 Branch: refs/heads/master Commit: a20bc479347b2a10307ada45dc7220ee00671fac Parents: 565e99f 38f0b11 Author: Thomas Groh Authored: Wed Nov 30 16:26:33 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 30 16:26:33 2016 -0800 -- .../core/PushbackSideInputDoFnRunner.java | 20 .../core/PushbackSideInputDoFnRunnerTest.java | 18 +++--- 2 files changed, 27 insertions(+), 11 deletions(-) --
[1/2] incubator-beam git commit: Revert "Improvements to ReduceFnRunner prefetching"
Repository: incubator-beam Updated Branches: refs/heads/master 4b682039d -> c8f2cdb22 Revert "Improvements to ReduceFnRunner prefetching" This reverts commit 4282c67c5fa4dea2fe6c8695e0ea23f383c6457b, which contained some incompatibilities outside of runners-core. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaa3b91e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaa3b91e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaa3b91e Branch: refs/heads/master Commit: aaa3b91e1e7b39dd585314a6017235cdd127e923 Parents: 4b68203 Author: Kenneth Knowles Authored: Wed Nov 30 15:21:53 2016 -0800 Committer: Kenneth Knowles Committed: Wed Nov 30 15:21:53 2016 -0800 -- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 11 +- .../beam/runners/core/PaneInfoTracker.java | 4 - .../runners/core/ReduceFnContextFactory.java| 9 +- .../beam/runners/core/ReduceFnRunner.java | 488 +++ .../apache/beam/runners/core/WatermarkHold.java | 5 - .../triggers/TriggerStateMachineRunner.java | 14 +- .../beam/runners/core/ReduceFnTester.java | 4 +- .../GroupAlsoByWindowEvaluatorFactory.java | 5 +- .../apache/beam/sdk/transforms/DoFnTester.java | 6 +- .../sdk/util/state/InMemoryTimerInternals.java | 22 +- .../beam/sdk/util/state/TimerCallback.java | 9 +- .../util/state/InMemoryTimerInternalsTest.java | 54 +- 12 files changed, 224 insertions(+), 407 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 294f21d..8b10813 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; @@ -72,9 +73,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn< @Override public void processElement(ProcessContext c) throws Exception { -KeyedWorkItem keyedWorkItem = c.element(); +KeyedWorkItem element = c.element(); -K key = keyedWorkItem.key(); +K key = c.element().key(); TimerInternals timerInternals = c.windowingInternals().timerInternals(); StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); @@ -92,8 +93,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn< reduceFn, c.getPipelineOptions()); -reduceFnRunner.processElements(keyedWorkItem.elementsIterable()); -reduceFnRunner.onTimers(keyedWorkItem.timersIterable()); +reduceFnRunner.processElements(element.elementsIterable()); +for (TimerData timer : element.timersIterable()) { + reduceFnRunner.onTimer(timer); +} reduceFnRunner.persist(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java index 69a4cfd..8140243 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java @@ -54,10 +54,6 @@ public class PaneInfoTracker { state.access(PANE_INFO_TAG).clear(); } - public void prefetchPaneInfo(ReduceFn.Context context) { -context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater(); - } - /** * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane * info includes the timing for the pane, who's calculation is quite subtle. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
[2/2] incubator-beam git commit: This closes #1474
This closes #1474 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8f2cdb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8f2cdb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8f2cdb2 Branch: refs/heads/master Commit: c8f2cdb223cf39d76e733e4dc3fffb5487d41fa4 Parents: 4b68203 aaa3b91 Author: Thomas Groh Authored: Wed Nov 30 15:30:30 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 30 15:30:30 2016 -0800 -- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 11 +- .../beam/runners/core/PaneInfoTracker.java | 4 - .../runners/core/ReduceFnContextFactory.java| 9 +- .../beam/runners/core/ReduceFnRunner.java | 488 +++ .../apache/beam/runners/core/WatermarkHold.java | 5 - .../triggers/TriggerStateMachineRunner.java | 14 +- .../beam/runners/core/ReduceFnTester.java | 4 +- .../GroupAlsoByWindowEvaluatorFactory.java | 5 +- .../apache/beam/sdk/transforms/DoFnTester.java | 6 +- .../sdk/util/state/InMemoryTimerInternals.java | 22 +- .../beam/sdk/util/state/TimerCallback.java | 9 +- .../util/state/InMemoryTimerInternalsTest.java | 54 +- 12 files changed, 224 insertions(+), 407 deletions(-) --
[GitHub] incubator-beam pull request #1470: [BEAM-1063] Shutdown DynamicSplit Executo...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1470 [BEAM-1063] Shutdown DynamicSplit Executor in Cleanup Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This ensures that the threads will be shut off when the pipeline shuts down, enabling a JVM with no more work to do to shut down as well. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam direct_runner_finishes_and_hangs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1470.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1470 commit 2780181991afd62093ca79330f95ff7c759c2f6b Author: Thomas Groh Date: 2016-11-30T22:30:14Z Shutdown DynamicSplit Executor in Cleanup This ensures that the threads will be shut off when the pipeline shuts down, enabling a JVM with no more work to do to shut down as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1469: [BEAM-646] Move TransformHierarchy Mainte...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1469 [BEAM-646] Move TransformHierarchy Maintenance into it Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This reduces the complexity of Pipeline.applyInternal by keeping the responsiblities to passing a node into the Transform Hierarchy, enforcing name uniqueness, and causing the runner to expand the PTransform. This logic is moved to the appropriate application sites. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam transform_hierarchy_maintenance_internally Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1469.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1469 commit ae6ac672e582148b846c3910cc7247700b3403cb Author: Thomas Groh Date: 2016-11-29T22:29:47Z Move TransformHierarchy Maintenance into it This reduces the complexity of Pipeline.applyInternal by keeping the responsiblities to passing a node into the Transform Hierarchy, enforcing name uniqueness, and causing the runner to expand the PTransform. This logic is moved to the appropriate application sites. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: This closes #1382
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 5ce75a2ea -> 70c1de9b9 This closes #1382 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70c1de9b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70c1de9b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70c1de9b Branch: refs/heads/python-sdk Commit: 70c1de9b95e9c20e5efb277d9ad50ae6348418e0 Parents: 5ce75a2 81e7a0f Author: Thomas Groh Authored: Tue Nov 29 15:43:04 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 29 15:43:04 2016 -0800 -- sdks/python/apache_beam/dataflow_test.py | 66 +++ sdks/python/apache_beam/test_pipeline.py | 76 +++ 2 files changed, 120 insertions(+), 22 deletions(-) --
[2/2] incubator-beam git commit: Support ValidatesRunner Attribute in Python
Support ValidatesRunner Attribute in Python This is roughly equivalent to "RunnableOnService" in the Java SDK. See BEAM-655 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81e7a0f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81e7a0f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81e7a0f6 Branch: refs/heads/python-sdk Commit: 81e7a0f653864212a5c9d3d0802608f92bb34501 Parents: 5ce75a2 Author: Mark Liu Authored: Thu Nov 17 14:45:42 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 29 15:43:04 2016 -0800 -- sdks/python/apache_beam/dataflow_test.py | 66 +++ sdks/python/apache_beam/test_pipeline.py | 76 +++ 2 files changed, 120 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index f96e8af..ba3553a 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -24,13 +24,13 @@ import re import unittest import apache_beam as beam -from apache_beam.pipeline import Pipeline from apache_beam.pvalue import AsDict from apache_beam.pvalue import AsIter as AllOf from apache_beam.pvalue import AsList from apache_beam.pvalue import AsSingleton from apache_beam.pvalue import EmptySideInput from apache_beam.pvalue import SideOutputValue +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import Create from apache_beam.transforms import DoFn from apache_beam.transforms import FlatMap @@ -42,6 +42,7 @@ from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms.window import WindowFn +from nose.plugins.attrib import attr class DataflowTest(unittest.TestCase): @@ -58,8 +59,9 @@ class DataflowTest(unittest.TestCase): | 'GroupCounts' >> GroupByKey() | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones + @attr('ValidatesRunner') def test_word_count(self): -pipeline = Pipeline('DirectPipelineRunner') +pipeline = TestPipeline() lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) result = ( (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) @@ -67,8 +69,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) pipeline.run() + @attr('ValidatesRunner') def test_map(self): -pipeline = Pipeline('DirectPipelineRunner') +pipeline = TestPipeline() lines = pipeline | 'input' >> Create(['a', 'b', 'c']) result = (lines | 'upper' >> Map(str.upper) @@ -76,8 +79,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_side_input_as_arg(self): -pipeline = Pipeline('DirectPipelineRunner') +pipeline = TestPipeline() words_list = ['aa', 'bb', 'cc'] words = pipeline | 'SomeWords' >> Create(words_list) prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in @@ -89,8 +93,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_side_input_as_keyword_arg(self): -pipeline = Pipeline('DirectPipelineRunner') +pipeline = TestPipeline() words_list = ['aa', 'bb', 'cc'] words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' @@ -102,6 +107,7 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_do_fn_object(self): class SomeDoFn(DoFn): """A custom DoFn for a FlatMap transform.""" @@ -109,7 +115,7 @@ class DataflowTest(unittest.TestCase): def process(self, context, prefix, suffix): return ['%s-%s-%s' % (prefix, context.element, suffix)] -pipeline = Pipeline('DirectPipelineRunner') +pipeline = TestPipeline() words_list = ['aa', 'bb', 'cc'] words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' @@ -119,6 +125,7 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() + @attr('ValidatesRunner') def test_par_do_with_multiple_outputs_and_using_yield(self): class SomeDoFn(DoFn): """A c
[2/2] incubator-beam git commit: This closes #1457
This closes #1457 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1f7013d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1f7013d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1f7013d Branch: refs/heads/master Commit: b1f7013d88cea1290f92898b36d6a761546f8e60 Parents: 8d127be 1184bfa Author: Thomas Groh Authored: Tue Nov 29 15:37:58 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 29 15:37:58 2016 -0800 -- .../src/main/java/org/apache/beam/sdk/Pipeline.java | 15 --- 1 file changed, 15 deletions(-) --
[1/2] incubator-beam git commit: Remove TransformApplicationsForTesting
Repository: incubator-beam Updated Branches: refs/heads/master 8d127beb8 -> b1f7013d8 Remove TransformApplicationsForTesting This field is mutated but never queried. Remove Pipeline#addValueInternal This method is never called and not suitable for use. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1184bfa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1184bfa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1184bfa7 Branch: refs/heads/master Commit: 1184bfa7a3ee5d58d65c9ba9200e91f71856ce4a Parents: 8d127be Author: Thomas Groh Authored: Tue Nov 29 14:30:09 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 29 15:37:57 2016 -0800 -- .../src/main/java/org/apache/beam/sdk/Pipeline.java | 15 --- 1 file changed, 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1184bfa7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index e188b35..9edf496 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -329,8 +327,6 @@ public class Pipeline { private Collection values = new ArrayList<>(); private Set usedFullNames = new HashSet<>(); private CoderRegistry coderRegistry; - private Multimap, AppliedPTransform> transformApplicationsForTesting = - HashMultimap.create(); /** * @deprecated replaced by {@link #Pipeline(PipelineRunner, PipelineOptions)} @@ -399,7 +395,6 @@ public class Pipeline { AppliedPTransform applied = AppliedPTransform.of( child.getFullName(), input, output, transform); - transformApplicationsForTesting.put(transform, applied); // recordAsOutput is a NOOP if already called; output.recordAsOutput(applied); verifyOutputState(output, child); @@ -513,14 +508,4 @@ public class Pipeline { private String buildName(String namePrefix, String name) { return namePrefix.isEmpty() ? name : namePrefix + "/" + name; } - - /** - * Adds the given {@link PValue} to this {@link Pipeline}. - * - * For internal use only. - */ - public void addValueInternal(PValue value) { -this.values.add(value); -LOG.debug("Adding {} to {}", value, this); - } }
[GitHub] incubator-beam pull request #1457: Remove TransformApplicationsForTesting
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1457 Remove TransformApplicationsForTesting Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This field is mutated but never queried. Remove Pipeline#addValueInternal This method is never called and not suitable for use. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam remove_some_internals Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1457.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1457 commit 9755baf949031a9398049072aa2817f5e82bf8ff Author: Thomas Groh Date: 2016-11-29T22:30:09Z Remove TransformApplicationsForTesting This field is mutated but never queried. Remove Pipeline#addValueInternal This method is never called and not suitable for use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1447
This closes #1447 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cdb7ba16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cdb7ba16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cdb7ba16 Branch: refs/heads/master Commit: cdb7ba165fb29d7e870ff959fd7bd00416c863bb Parents: 4bcef03 0ae1812 Author: Thomas Groh Authored: Mon Nov 28 17:51:04 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 28 17:51:04 2016 -0800 -- .../main/java/org/apache/beam/runners/direct/DirectRunner.java | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) --
[1/2] incubator-beam git commit: Remove the Experimental Annotation from the DirectRunner
Repository: incubator-beam Updated Branches: refs/heads/master 4bcef03d9 -> cdb7ba165 Remove the Experimental Annotation from the DirectRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ae18124 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ae18124 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ae18124 Branch: refs/heads/master Commit: 0ae181248823b08803a8f8ac0d39a35a190c4acd Parents: 4bcef03 Author: Thomas Groh Authored: Mon Nov 28 15:52:03 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 28 15:52:51 2016 -0800 -- .../main/java/org/apache/beam/runners/direct/DirectRunner.java | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ae18124/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index cb31947..f71e109 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.metrics.MetricResults; @@ -71,9 +70,7 @@ import org.joda.time.Instant; * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded * {@link PCollection PCollections}. */ -@Experimental -public class DirectRunner -extends PipelineRunner { +public class DirectRunner extends PipelineRunner { /** * The default set of transform overrides to use in the {@link DirectRunner}. *
[GitHub] incubator-beam pull request #1447: Remove the Experimental Annotation from t...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1447 Remove the Experimental Annotation from the DirectRunner Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- R: @kennknowles You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam not_experimental_direct_runner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1447.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1447 commit 0ae181248823b08803a8f8ac0d39a35a190c4acd Author: Thomas Groh Date: 2016-11-28T23:52:03Z Remove the Experimental Annotation from the DirectRunner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Add direct runner dependency to starter archetype
Add direct runner dependency to starter archetype Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14502a31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14502a31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14502a31 Branch: refs/heads/master Commit: 14502a3122bd1c1652196881797ffb60d480164e Parents: 33c6870 Author: Scott Wegner Authored: Tue Nov 22 08:56:17 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 28 13:17:46 2016 -0800 -- .../examples/src/main/resources/archetype-resources/pom.xml | 2 +- .../starter/src/main/resources/archetype-resources/pom.xml | 8 .../src/test/resources/projects/basic/reference/pom.xml | 8 3 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14502a31/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml -- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml index b18c57c..031ee88 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml @@ -93,7 +93,7 @@ 0.4.0-incubating-SNAPSHOT - + org.apache.beam beam-runners-direct-java http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14502a31/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml -- diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml index 738acde..4fae02c 100644 --- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml @@ -72,6 +72,14 @@ 0.4.0-incubating-SNAPSHOT + + + org.apache.beam + beam-runners-direct-java + 0.4.0-incubating-SNAPSHOT + runtime + + org.slf4j http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14502a31/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml -- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml index 6950ed5..4656e63 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml @@ -72,6 +72,14 @@ 0.4.0-incubating-SNAPSHOT + + + org.apache.beam + beam-runners-direct-java + 0.4.0-incubating-SNAPSHOT + runtime + + org.slf4j
[1/2] incubator-beam git commit: This closes #1414
Repository: incubator-beam Updated Branches: refs/heads/master 33c687069 -> d5aeee9c1 This closes #1414 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d5aeee9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d5aeee9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d5aeee9c Branch: refs/heads/master Commit: d5aeee9c17a75307ff539e40b3ddc176790ec559 Parents: 33c6870 14502a3 Author: Thomas Groh Authored: Mon Nov 28 13:17:46 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 28 13:17:46 2016 -0800 -- .../examples/src/main/resources/archetype-resources/pom.xml | 2 +- .../starter/src/main/resources/archetype-resources/pom.xml | 8 .../src/test/resources/projects/basic/reference/pom.xml | 8 3 files changed, 17 insertions(+), 1 deletion(-) --
[GitHub] incubator-beam pull request #1442: [BEAM-646] Add Replacement Methods to Tra...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1442 [BEAM-646] Add Replacement Methods to TransformHierarchy, PValue Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- These are used as the underlying mechanism for Pipeline Surgery. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam transform_hierarchy_surgery_methods Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1442.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1442 commit 09b46520a6a7baa6bd59e69f7bec86a601b5 Author: Thomas Groh Date: 2016-11-23T02:19:03Z Add Replacement Methods to TransformHierarchy, PValue These are used as the underlying mechanism for Pipeline Surgery. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Add input type to TransformResult
Add input type to TransformResult This would likely have caught some hard-to-diagnose type safety errors during the development of StatefulParDoEvaluatorFactory, so adding it should hopefully catch similar bugs in the future. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7502adda Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7502adda Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7502adda Branch: refs/heads/master Commit: 7502adda3262bce9d6d4fe4499bde8d8b5273029 Parents: 9fbd2d2 Author: Kenneth Knowles Authored: Tue Nov 22 16:01:45 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 28 10:06:31 2016 -0800 -- .../direct/AbstractModelEnforcement.java| 2 +- .../direct/BoundedReadEvaluatorFactory.java | 2 +- .../beam/runners/direct/CommittedResult.java| 2 +- .../beam/runners/direct/CompletionCallback.java | 2 +- ...ecycleManagerRemovingTransformEvaluator.java | 2 +- .../runners/direct/EmptyTransformEvaluator.java | 4 +- .../beam/runners/direct/EvaluationContext.java | 2 +- .../direct/ExecutorServiceParallelExecutor.java | 2 +- .../runners/direct/FlattenEvaluatorFactory.java | 10 ++--- .../GroupAlsoByWindowEvaluatorFactory.java | 5 ++- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 2 +- .../direct/ImmutabilityEnforcementFactory.java | 2 +- .../beam/runners/direct/ModelEnforcement.java | 2 +- .../beam/runners/direct/ParDoEvaluator.java | 2 +- .../direct/PassthroughTransformEvaluator.java | 4 +- .../runners/direct/StepTransformResult.java | 38 + .../direct/TestStreamEvaluatorFactory.java | 2 +- .../beam/runners/direct/TransformEvaluator.java | 2 +- .../beam/runners/direct/TransformExecutor.java | 4 +- .../beam/runners/direct/TransformResult.java| 16 +-- .../direct/UnboundedReadEvaluatorFactory.java | 3 +- .../runners/direct/ViewEvaluatorFactory.java| 2 +- .../runners/direct/WindowEvaluatorFactory.java | 6 ++- .../direct/BoundedReadEvaluatorFactoryTest.java | 10 ++--- ...leManagerRemovingTransformEvaluatorTest.java | 4 +- .../runners/direct/EvaluationContextTest.java | 20 - .../direct/FlattenEvaluatorFactoryTest.java | 6 +-- .../ImmutabilityEnforcementFactoryTest.java | 6 +-- .../beam/runners/direct/ParDoEvaluatorTest.java | 2 +- .../runners/direct/StepTransformResultTest.java | 25 ++- .../direct/TestStreamEvaluatorFactoryTest.java | 10 ++--- .../runners/direct/TransformExecutorTest.java | 45 ++-- .../UnboundedReadEvaluatorFactoryTest.java | 20 ++--- .../direct/WindowEvaluatorFactoryTest.java | 12 +++--- 34 files changed, 152 insertions(+), 126 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java index 81f0f5f..f09164b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java @@ -33,6 +33,6 @@ abstract class AbstractModelEnforcement implements ModelEnforcement { @Override public void afterFinish( CommittedBundle input, - TransformResult result, + TransformResult result, Iterable> outputs) {} } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 66c55cd..65b622f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -161,7 +161,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @Override -public TransformResult finishBundle() { +public TransformResult> finishBundle() { return resultBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResu
[1/2] incubator-beam git commit: This closes #1424
Repository: incubator-beam Updated Branches: refs/heads/master 9fbd2d24e -> 5e9a80cf6 This closes #1424 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5e9a80cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5e9a80cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5e9a80cf Branch: refs/heads/master Commit: 5e9a80cf61c4666edb5febd9a0fface2d2c26261 Parents: 9fbd2d2 7502add Author: Thomas Groh Authored: Mon Nov 28 10:06:31 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 28 10:06:31 2016 -0800 -- .../direct/AbstractModelEnforcement.java| 2 +- .../direct/BoundedReadEvaluatorFactory.java | 2 +- .../beam/runners/direct/CommittedResult.java| 2 +- .../beam/runners/direct/CompletionCallback.java | 2 +- ...ecycleManagerRemovingTransformEvaluator.java | 2 +- .../runners/direct/EmptyTransformEvaluator.java | 4 +- .../beam/runners/direct/EvaluationContext.java | 2 +- .../direct/ExecutorServiceParallelExecutor.java | 2 +- .../runners/direct/FlattenEvaluatorFactory.java | 10 ++--- .../GroupAlsoByWindowEvaluatorFactory.java | 5 ++- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 2 +- .../direct/ImmutabilityEnforcementFactory.java | 2 +- .../beam/runners/direct/ModelEnforcement.java | 2 +- .../beam/runners/direct/ParDoEvaluator.java | 2 +- .../direct/PassthroughTransformEvaluator.java | 4 +- .../runners/direct/StepTransformResult.java | 38 + .../direct/TestStreamEvaluatorFactory.java | 2 +- .../beam/runners/direct/TransformEvaluator.java | 2 +- .../beam/runners/direct/TransformExecutor.java | 4 +- .../beam/runners/direct/TransformResult.java| 16 +-- .../direct/UnboundedReadEvaluatorFactory.java | 3 +- .../runners/direct/ViewEvaluatorFactory.java| 2 +- .../runners/direct/WindowEvaluatorFactory.java | 6 ++- .../direct/BoundedReadEvaluatorFactoryTest.java | 10 ++--- ...leManagerRemovingTransformEvaluatorTest.java | 4 +- .../runners/direct/EvaluationContextTest.java | 20 - .../direct/FlattenEvaluatorFactoryTest.java | 6 +-- .../ImmutabilityEnforcementFactoryTest.java | 6 +-- .../beam/runners/direct/ParDoEvaluatorTest.java | 2 +- .../runners/direct/StepTransformResultTest.java | 25 ++- .../direct/TestStreamEvaluatorFactoryTest.java | 10 ++--- .../runners/direct/TransformExecutorTest.java | 45 ++-- .../UnboundedReadEvaluatorFactoryTest.java | 20 ++--- .../direct/WindowEvaluatorFactoryTest.java | 12 +++--- 34 files changed, 152 insertions(+), 126 deletions(-) --
[GitHub] incubator-beam pull request #1425: Add TransformHierarchyTest
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1425 Add TransformHierarchyTest Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This tests basic features of TransformHierarchy You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam transform_hierarchy_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1425.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1425 commit 7d72601b9be356c35e5dd25b86b0857ad4ce554a Author: Thomas Groh Date: 2016-11-23T00:14:29Z Add TransformHierarchyTest This tests basic features of TransformHierarchy --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1422: Output Keyed Bundles in GroupAlsoByWindow...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1422 Output Keyed Bundles in GroupAlsoByWindowEvaluator Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This allows reuse of keys for downstream serialization. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam no_gbk_keyedness Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1422 commit 22fbb1728c73d4595fad9bd6e1a13c83100048ca Author: Thomas Groh Date: 2016-11-22T22:51:39Z Output Keyed Bundles in GroupAlsoByWindowEvaluator This allows reuse of keys for downstream serialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Simplify the API for managing MetricsEnvironment
Simplify the API for managing MetricsEnvironment 1. setCurrentContainer returns the previous MetricsEnvironment 2. setCurrentContainer(null) resets the thread local 3. scopedCurrentContainer sets the container and returns a Closeable to reset the previous container. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6870a6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6870a6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6870a6d Branch: refs/heads/master Commit: e6870a6dc10e4ad52a911c316137a9f7731a9194 Parents: 6ec45f7 Author: bchambers Authored: Tue Nov 22 11:37:23 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 22 12:27:41 2016 -0800 -- .../beam/runners/direct/TransformExecutor.java | 5 +- .../beam/sdk/metrics/MetricsEnvironment.java| 60 +++- .../sdk/metrics/MetricsEnvironmentTest.java | 8 +-- .../apache/beam/sdk/metrics/MetricsTest.java| 6 +- 4 files changed, 56 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index 1704955..fb31cc9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; @@ -89,8 +90,7 @@ class TransformExecutor implements Runnable { @Override public void run() { MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName()); -MetricsEnvironment.setMetricsContainer(metricsContainer); -try { +try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { Collection> enforcements = new ArrayList<>(); for (ModelEnforcementFactory enforcementFactory : modelEnforcements) { ModelEnforcement enforcement = enforcementFactory.forBundle(inputBundle, transform); @@ -117,7 +117,6 @@ class TransformExecutor implements Runnable { // Report the physical metrics from the end of this step. context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative()); - MetricsEnvironment.unsetMetricsContainer(); transformEvaluationState.complete(this); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java index ef2660a8..7c06cbf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.metrics; +import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory; * returned objects to create and modify metrics. * * The runner should create {@link MetricsContainer} for each context in which metrics are - * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that - * may update metrics within that step. + * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that + * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore + * the previous container. * - * The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to - * the previous value) when exiting code that set the metrics container. + * Alternatively, the runner can use {@link #scopedMetricsContainer(MetricsContainer)} to set the + * container for the current thread and get a {@link Closeable} that will restore the previous + * container when closed. */ public class MetricsEnvironment { @@ -45,15 +49,20 @@ public class MetricsEnvironment { private static final ThreadLocal CONTAINER_FOR_THREAD = new ThreadLocal(); - /** Set the
[1/2] incubator-beam git commit: This closes #1417
Repository: incubator-beam Updated Branches: refs/heads/master 6ec45f7e7 -> b41789e9c This closes #1417 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b41789e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b41789e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b41789e9 Branch: refs/heads/master Commit: b41789e9c5ac8691243c796968b00a65cc11dd39 Parents: 6ec45f7 e6870a6 Author: Thomas Groh Authored: Tue Nov 22 12:27:41 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 22 12:27:41 2016 -0800 -- .../beam/runners/direct/TransformExecutor.java | 5 +- .../beam/sdk/metrics/MetricsEnvironment.java| 60 +++- .../sdk/metrics/MetricsEnvironmentTest.java | 8 +-- .../apache/beam/sdk/metrics/MetricsTest.java| 6 +- 4 files changed, 56 insertions(+), 23 deletions(-) --
[2/2] incubator-beam git commit: Support @ValidatesRunner(RunnableOnService) in Python [1/2]
Support @ValidatesRunner(RunnableOnService) in Python [1/2] Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc706608 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc706608 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc706608 Branch: refs/heads/python-sdk Commit: cc706608b281c3beeebd2487084946c06bc83f30 Parents: 9060f83 Author: Mark Liu Authored: Thu Nov 17 09:53:01 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 22 10:11:22 2016 -0800 -- sdks/python/setup.py | 4 sdks/python/test_config.py | 44 + 2 files changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc706608/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 1299bbf..525f59c 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -137,4 +137,8 @@ setuptools.setup( ], license='Apache License, Version 2.0', keywords=PACKAGE_KEYWORDS, +entry_points={ +'nose.plugins.0.10': [ +'beam_test_plugin = test_config:BeamTestPlugin' +]} ) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc706608/sdks/python/test_config.py -- diff --git a/sdks/python/test_config.py b/sdks/python/test_config.py new file mode 100644 index 000..bde2795 --- /dev/null +++ b/sdks/python/test_config.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Test configurations for nose + +This module contains nose plugin hooks that configures Beam tests which +includes ValidatesRunner test and E2E integration test. + +""" + +from nose.plugins import Plugin + + +class BeamTestPlugin(Plugin): + """A nose plugin for Beam testing that registers command line options + + This plugin is registered through setuptools in entry_points. + """ + + def options(self, parser, env): +"""Add '--test-pipeline-options' to command line option to avoid +unrecognized option error thrown by nose. + +The value of this option will be processed by TestPipeline and used to +build customized pipeline for ValidatesRunner tests. +""" +parser.add_option('--test-pipeline-options', + action='store', + type=str, + help='providing pipeline options to run tests on runner')
[1/2] incubator-beam git commit: This closes #1376
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 9060f8395 -> 28bfd9090 This closes #1376 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/28bfd909 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/28bfd909 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/28bfd909 Branch: refs/heads/python-sdk Commit: 28bfd9090b0ce33c3da0bab0220fcc1ef8a72b4b Parents: 9060f83 cc70660 Author: Thomas Groh Authored: Tue Nov 22 10:11:22 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 22 10:11:22 2016 -0800 -- sdks/python/setup.py | 4 sdks/python/test_config.py | 44 + 2 files changed, 48 insertions(+) --
[1/2] incubator-beam git commit: Update StarterPipeline
Repository: incubator-beam Updated Branches: refs/heads/master e53d6d458 -> c2dc38639 Update StarterPipeline Convert StarterPipeline ParDo to MapElements. Use the new DoFn for non-outputting transforms. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c80554b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c80554b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c80554b8 Branch: refs/heads/master Commit: c80554b83426a585c762143e0ad533a73c2c3f0f Parents: e53d6d4 Author: Scott Wegner Authored: Mon Nov 21 16:33:07 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 22 10:09:12 2016 -0800 -- .../src/main/java/StarterPipeline.java| 18 ++ .../src/main/java/it/pkg/StarterPipeline.java | 18 ++ 2 files changed, 20 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c80554b8/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java -- diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java index 0b21aa6..d6afdec 100644 --- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java @@ -20,13 +20,15 @@ package ${package}; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A starter example for writing Google Cloud Dataflow programs. + * A starter example for writing Beam programs. * * The example takes two strings, converts them to their upper-case * representation and logs them. @@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory; * Platform, you should specify the following command-line options: * --project= * --stagingLocation= - * --runner=BlockingDataflowRunner + * --runner=DataflowRunner */ public class StarterPipeline { private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); @@ -49,14 +51,14 @@ public class StarterPipeline { PipelineOptionsFactory.fromArgs(args).withValidation().create()); p.apply(Create.of("Hello", "World")) -.apply(ParDo.of(new OldDoFn() { +.apply(MapElements.via(new SimpleFunction() { @Override - public void processElement(ProcessContext c) { -c.output(c.element().toUpperCase()); + public String apply(String input) { +return input.toUpperCase(); } })) -.apply(ParDo.of(new OldDoFn() { - @Override +.apply(ParDo.of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { LOG.info(c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c80554b8/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java -- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java index b332442..4ae92e8 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java @@ -20,13 +20,15 @@ package it.pkg; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A starter example for writing Google Cloud Dataflow programs. + * A starter example for writing Beam programs. * * The e
[2/2] incubator-beam git commit: This closes #1406
This closes #1406 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2dc3863 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2dc3863 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2dc3863 Branch: refs/heads/master Commit: c2dc38639ddd37c393d3bd9f341c2ec032a1096c Parents: e53d6d4 c80554b Author: Thomas Groh Authored: Tue Nov 22 10:10:02 2016 -0800 Committer: Thomas Groh Committed: Tue Nov 22 10:10:02 2016 -0800 -- .../src/main/java/StarterPipeline.java| 18 ++ .../src/main/java/it/pkg/StarterPipeline.java | 18 ++ 2 files changed, 20 insertions(+), 16 deletions(-) --
[1/2] incubator-beam git commit: Block earlier in BoundedReadEvaluatorFactoryTest
Repository: incubator-beam Updated Branches: refs/heads/master 1543ea952 -> 212fec4eb Block earlier in BoundedReadEvaluatorFactoryTest This ensures that the reader doesn't claim the split point, which in turn ensures the dynamic split request will not be refused by the OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits flakes, as if the reader is faster than the split thread it can run past the point at which the splitter thread will attempt to split the source, which causes the reader to read all of the elements. Sleep within TestReader#advanceImpl if the reader is being dynamically split, to ensure that the dynamic split fully completes before continuing a call to advance. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a8d32e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a8d32e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a8d32e5 Branch: refs/heads/master Commit: 4a8d32e5d3726b851329d507a8d0392cc03f6e85 Parents: 1543ea9 Author: Thomas Groh Authored: Thu Nov 17 10:56:49 2016 -0800 Committer: Thomas Groh Committed: Thu Nov 17 14:37:47 2016 -0800 -- .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++- 1 file changed, 14 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a8d32e5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 9d8503a..e956c34 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.OffsetBasedSource; import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -142,9 +143,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); AppliedPTransform transform = read.getProducingTransformInternal(); Collection> unreadInputs = -new BoundedReadEvaluatorFactory.InputProvider(context) -.getInitialInputs(transform, -1); +new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1); Collection> outputs = new ArrayList<>(); int numIterations = 0; @@ -155,8 +154,7 @@ public class BoundedReadEvaluatorFactoryTest { Collection> newUnreadInputs = new ArrayList<>(); for (CommittedBundle shardBundle : unreadInputs) { -TransformEvaluator evaluator = -factory.forApplication(transform, null); +TransformEvaluator evaluator = factory.forApplication(transform, null); for (WindowedValue shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } @@ -178,8 +176,6 @@ public class BoundedReadEvaluatorFactoryTest { unreadInputs = newUnreadInputs; } -// We produced at least one split before we read 1000 elements, as we will attempt to split as -// quickly as possible. assertThat(numIterations, greaterThan(1)); WindowedValue[] expectedValues = new WindowedValue[numElements]; for (long i = 0L; i < numElements; i++) { @@ -343,7 +339,7 @@ public class BoundedReadEvaluatorFactoryTest { private static boolean readerClosed; private final Coder coder; private final T[] elems; -private final int awaitSplitIndex; +private final int firstSplitIndex; private transient CountDownLatch subrangesCompleted; @@ -351,11 +347,11 @@ public class BoundedReadEvaluatorFactoryTest { this(coder, elems.length, elems); } -public TestSource(Coder coder, int awaitSplitIndex, T... elems) { +public TestSource(Coder coder, int firstSplitIndex, T... elems) { super(0L, elems.length, 1L); this.elems = elems; this.coder = coder; - this.awaitSplitIndex = awaitSplitIndex; + this.firstSplitIndex = firstSplitIndex; readerClosed = false; subrangesCompleted = new CountDownLatch(2); @@ -380,7 +376,7 @@ public class BoundedReadEvaluato
[2/2] incubator-beam git commit: This closes #1377
This closes #1377 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/212fec4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/212fec4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/212fec4e Branch: refs/heads/master Commit: 212fec4eba053fe603238a13ca6e0d9cb6aef697 Parents: 1543ea9 4a8d32e Author: Thomas Groh Authored: Thu Nov 17 14:37:48 2016 -0800 Committer: Thomas Groh Committed: Thu Nov 17 14:37:48 2016 -0800 -- .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++- 1 file changed, 14 insertions(+), 12 deletions(-) --
[GitHub] incubator-beam pull request #1377: [BEAM-999] Block earlier in BoundedReadEv...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1377 [BEAM-999] Block earlier in BoundedReadEvaluatorFactoryTest Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This ensures that the reader doesn't claim the split point, which in turn ensures the dynamic split request will not be refused by the OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits flakes, as if the reader is faster than the split thread it can run past the point at which the splitter thread will attempt to split the source, which causes the reader to read all of the elements. Spin within TestReader#advanceImpl if the reader is being dynamically split, to ensure that the dynamic split fully completes before continuing a call to advance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam dynamic_splitting Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1377.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1377 commit bb793e3f578eadbe1d1d66e42c656a6a3f81f18f Author: Thomas Groh Date: 2016-11-17T18:56:49Z Block earlier in BoundedReadEvaluatorFactoryTest This ensures that the reader doesn't claim the split point, which in turn ensures the dynamic split request will not be refused by the OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits flakes, as if the reader is faster than the split thread it can run past the point at which the splitter thread will attempt to split the source, which causes the reader to read all of the elements. Sleep within TestReader#advanceImpl if the reader is being dynamically split, to ensure that the dynamic split fully completes before continuing a call to advance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1373
This closes #1373 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/abd9fb3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/abd9fb3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/abd9fb3d Branch: refs/heads/master Commit: abd9fb3d7ced120c4c307df601673e3a548cfa87 Parents: 15e93c5 67ce531 Author: Thomas Groh Authored: Wed Nov 16 18:21:58 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 16 18:21:58 2016 -0800 -- .../direct/BoundedReadEvaluatorFactory.java | 28 - .../beam/runners/direct/DirectRunner.java | 7 +++-- .../beam/runners/direct/EmptyInputProvider.java | 22 +++--- .../direct/ExecutorServiceParallelExecutor.java | 12 .../beam/runners/direct/RootInputProvider.java | 16 ++ .../runners/direct/RootProviderRegistry.java| 19 ++-- .../direct/TestStreamEvaluatorFactory.java | 23 +++--- .../direct/TransformEvaluatorRegistry.java | 5 +-- .../direct/UnboundedReadEvaluatorFactory.java | 32 +++- 9 files changed, 85 insertions(+), 79 deletions(-) --
[1/2] incubator-beam git commit: Properly apply Transform Overrides in the Direct Runner
Repository: incubator-beam Updated Branches: refs/heads/master 15e93c58e -> abd9fb3d7 Properly apply Transform Overrides in the Direct Runner Previously the direct runner would use the transform override to .apply(), but would keep the original transform in the pipeline, e.g. it would use the original transform to look up an evaluator. The current commit makes it use the node generated by applying the override as a nested node within the graph (including, potentially replacing it further recursively). Additionally, makes InputProvider type-safe. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/67ce5313 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/67ce5313 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/67ce5313 Branch: refs/heads/master Commit: 67ce53139b51617708cdf037d93c5195608accc5 Parents: 15e93c5 Author: Eugene Kirpichov Authored: Wed Nov 16 15:40:08 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 16 18:20:36 2016 -0800 -- .../direct/BoundedReadEvaluatorFactory.java | 28 - .../beam/runners/direct/DirectRunner.java | 7 +++-- .../beam/runners/direct/EmptyInputProvider.java | 22 +++--- .../direct/ExecutorServiceParallelExecutor.java | 12 .../beam/runners/direct/RootInputProvider.java | 16 ++ .../runners/direct/RootProviderRegistry.java| 19 ++-- .../direct/TestStreamEvaluatorFactory.java | 23 +++--- .../direct/TransformEvaluatorRegistry.java | 5 +-- .../direct/UnboundedReadEvaluatorFactory.java | 32 +++- 9 files changed, 85 insertions(+), 79 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/67ce5313/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 8becb91..66c55cd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -174,7 +175,8 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { abstract BoundedSource getSource(); } - static class InputProvider implements RootInputProvider { + static class InputProvider + implements RootInputProvider, PBegin, Read.Bounded> { private final EvaluationContext evaluationContext; InputProvider(EvaluationContext evaluationContext) { @@ -182,27 +184,21 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @Override -public Collection> getInitialInputs( -AppliedPTransform transform, int targetParallelism) throws Exception { - return createInitialSplits((AppliedPTransform) transform, targetParallelism); -} - -private -Collection>> createInitialSplits( -AppliedPTransform> transform, int targetParallelism) -throws Exception { - BoundedSource source = transform.getTransform().getSource(); +public Collection>> getInitialInputs( +AppliedPTransform, Read.Bounded> transform, int targetParallelism) +throws Exception { + BoundedSource source = transform.getTransform().getSource(); PipelineOptions options = evaluationContext.getPipelineOptions(); long estimatedBytes = source.getEstimatedSizeBytes(options); long bytesPerBundle = estimatedBytes / targetParallelism; - List> bundles = + List> bundles = source.splitIntoBundles(bytesPerBundle, options); - ImmutableList.Builder>> shards = + ImmutableList.Builder>> shards = ImmutableList.builder(); - for (BoundedSource bundle : bundles) { -CommittedBundle> inputShard = + for (BoundedSource bundle : bundles) { +CommittedBundle> inputShard = evaluationContext -.>createRootBundle() +.>createRootBundle() .add(WindowedValue.valueInGlobalWindow(BoundedSou
[4/4] incubator-beam git commit: Reduce incidence of Namespace StringKey comparisons
Reduce incidence of Namespace StringKey comparisons If the Namespace of a TimerData reports itself as being equal to the other namespace, immediately return 0 rather than generating the string keys and comparing them. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e6a4f4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e6a4f4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e6a4f4a Branch: refs/heads/master Commit: 3e6a4f4a49344871430d2711e934b0493c17499f Parents: c695ef4 Author: Thomas Groh Authored: Tue Nov 8 14:18:58 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 16 13:22:41 2016 -0800 -- .../java/org/apache/beam/sdk/util/TimerInternals.java | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e6a4f4a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java index 743f3f7..5d4a72d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java @@ -230,12 +230,17 @@ public interface TimerInternals { * arbitrary. */ @Override -public int compareTo(TimerData o) { +public int compareTo(TimerData that) { + if (this.equals(that)) { +return 0; + } ComparisonChain chain = - ComparisonChain.start().compare(timestamp, o.getTimestamp()).compare(domain, o.domain); - if (chain.result() == 0) { + ComparisonChain.start() + .compare(this.timestamp, that.getTimestamp()) + .compare(this.domain, that.domain); + if (chain.result() == 0 && !this.namespace.equals(that.namespace)) { // Obtaining the stringKey may be expensive; only do so if required -chain = chain.compare(namespace.stringKey(), o.namespace.stringKey()); +chain = chain.compare(namespace.stringKey(), that.namespace.stringKey()); } return chain.result(); }
[1/4] incubator-beam git commit: This closes #1319
Repository: incubator-beam Updated Branches: refs/heads/master c695ef48b -> 15e93c58e This closes #1319 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15e93c58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15e93c58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15e93c58 Branch: refs/heads/master Commit: 15e93c58e933ef913c89a9654ceb567299a2fe5b Parents: c695ef4 bf4c504 Author: Thomas Groh Authored: Wed Nov 16 13:22:41 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 16 13:22:41 2016 -0800 -- .../beam/runners/direct/DirectRunner.java | 3 +- .../beam/runners/direct/TransformExecutor.java | 23 --- .../runners/direct/TransformExecutorTest.java | 43 .../apache/beam/sdk/util/TimerInternals.java| 13 -- 4 files changed, 11 insertions(+), 71 deletions(-) --
[2/4] incubator-beam git commit: Remove unused Thread variable in TransformExecutor
Remove unused Thread variable in TransformExecutor Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ba4d181 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ba4d181 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ba4d181 Branch: refs/heads/master Commit: 5ba4d181d4625d43078bb0b071635d563d925277 Parents: 3e6a4f4 Author: Thomas Groh Authored: Tue Nov 8 14:16:23 2016 -0800 Committer: Thomas Groh Committed: Wed Nov 16 13:22:41 2016 -0800 -- .../beam/runners/direct/TransformExecutor.java | 23 --- .../runners/direct/TransformExecutorTest.java | 43 2 files changed, 66 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index c4002b5..1704955 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -17,13 +17,9 @@ */ package org.apache.beam.runners.direct; -import static com.google.common.base.Preconditions.checkState; - import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.metrics.MetricsContainer; @@ -70,8 +66,6 @@ class TransformExecutor implements Runnable { private final TransformExecutorService transformEvaluationState; private final EvaluationContext context; - private final AtomicReference thread; - private TransformExecutor( EvaluationContext context, TransformEvaluatorFactory factory, @@ -90,20 +84,12 @@ class TransformExecutor implements Runnable { this.transformEvaluationState = transformEvaluationState; this.context = context; -this.thread = new AtomicReference<>(); } @Override public void run() { MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName()); MetricsEnvironment.setMetricsContainer(metricsContainer); -checkState( -thread.compareAndSet(null, Thread.currentThread()), -"Tried to execute %s for %s on thread %s, but is already executing on thread %s", -TransformExecutor.class.getSimpleName(), -transform.getFullName(), -Thread.currentThread(), -thread.get()); try { Collection> enforcements = new ArrayList<>(); for (ModelEnforcementFactory enforcementFactory : modelEnforcements) { @@ -186,13 +172,4 @@ class TransformExecutor implements Runnable { } return result; } - - /** - * If this {@link TransformExecutor} is currently executing, return the thread it is executing in. - * Otherwise, return null. - */ - @Nullable - public Thread getThread() { -return thread.get(); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 32f874d..0b7b882 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; @@ -277,48 +276,6 @@ public class TransformExecutorTest { } @Test - public void duringCallGetThreadIsNonNull() throws Exception { -final TransformResult result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); -final CountDownLatch testLatch = new CountDownLatch(1); -final CountDownLatch evaluatorLatch = new CountDownLatch(1); -TransformEvaluator evaluator =