Removing Aggregators from PipelineResults and subclasses
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f26db8c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f26db8c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f26db8c Branch: refs/heads/master Commit: 6f26db8c479c9543003fd3bb8406117e25c4fed0 Parents: 1fe11d1 Author: Pablo <pabl...@google.com> Authored: Tue Mar 7 13:06:15 2017 -0800 Committer: bchambers <bchamb...@google.com> Committed: Tue Apr 25 12:45:33 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/apex/ApexRunnerResult.java | 9 ----- .../beam/runners/direct/DirectRunner.java | 34 ------------------ .../beam/runners/direct/DirectRunnerTest.java | 36 -------------------- .../flink/FlinkDetachedRunnerResult.java | 11 ------ .../beam/runners/flink/FlinkRunnerResult.java | 21 ------------ .../runners/dataflow/DataflowPipelineJob.java | 27 --------------- .../beam/runners/spark/SparkPipelineResult.java | 7 ---- .../beam/runners/spark/examples/WordCount.java | 2 +- .../org/apache/beam/sdk/PipelineResult.java | 12 ------- 9 files changed, 1 insertion(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index 8548194..41fdb75 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -23,12 +23,9 @@ import java.io.IOException; import org.apache.apex.api.Launcher.AppHandle; import org.apache.apex.api.Launcher.ShutdownMode; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; /** @@ -50,12 +47,6 @@ public class ApexRunnerResult implements PipelineResult { } @Override - public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - return null; - } - - @Override public State cancel() throws IOException { apexApp.shutdown(ShutdownMode.KILL); state = State.CANCELLED; http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 77ec68f..db2d252 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import com.google.common.base.MoreObjects; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -45,7 +44,6 @@ import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -375,38 +373,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { } @Override - public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - AggregatorContainer aggregators = evaluationContext.getAggregatorContainer(); - Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator); - final Map<String, T> stepValues = new HashMap<>(); - if (steps != null) { - for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) { - if (steps.contains(transform.getTransform())) { - T aggregate = aggregators - .getAggregate(evaluationContext.getStepName(transform), aggregator.getName()); - if (aggregate != null) { - stepValues.put(transform.getFullName(), aggregate); - } - } - } - } - return new AggregatorValues<T>() { - @Override - public Map<String, T> getValuesAtSteps() { - return stepValues; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("stepValues", stepValues) - .toString(); - } - }; - } - - @Override public MetricResults metrics() { return evaluationContext.getMetrics(); } http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index c55f84a..51ae12a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -40,7 +40,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; -import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; @@ -48,7 +47,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; @@ -60,7 +58,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -69,13 +66,9 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -530,35 +523,6 @@ public class DirectRunnerTest implements Serializable { p.run(); } - @Test - public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException { - Pipeline p = getPipeline(); - IdentityDoFn identityDoFn = new IdentityDoFn(); - p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), KV.of("key", "element3"))) - .apply(ParDo.of(identityDoFn)); - PipelineResult pipelineResult = p.run(); - pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues(); - } - - private static class IdentityDoFn extends DoFn<KV<String, String>, String> { - private final Aggregator<Long, Long> counter = createAggregator("counter", Sum.ofLongs()); - private static final String STATE_ID = "state"; - @StateId(STATE_ID) - private static final StateSpec<Object, ValueState<String>> stateSpec = - StateSpecs.value(StringUtf8Coder.of()); - - @ProcessElement - public void processElement(ProcessContext context, @StateId(STATE_ID) ValueState<String> state){ - state.write("state content"); - counter.addValue(1L); - context.output(context.element().getValue()); - } - - public Aggregator<Long, Long> getCounter() { - return counter; - } - } - private static class LongNoDecodeCoder extends CustomCoder<Long> { @Override public void encode( http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java index bf4395f..b4d4b08 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java @@ -19,11 +19,8 @@ package org.apache.beam.runners.flink; import java.io.IOException; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; @@ -41,14 +38,6 @@ public class FlinkDetachedRunnerResult implements PipelineResult { } @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - throw new AggregatorRetrievalException( - "Accumulators can't be retrieved for detached Job executions.", - new UnsupportedOperationException()); - } - - @Override public MetricResults metrics() { throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); } http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 0f2462d..dfc1d8e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -21,11 +21,8 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.flink.metrics.FlinkMetricResults; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; /** @@ -52,24 +49,6 @@ public class FlinkRunnerResult implements PipelineResult { } @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - // TODO provide a list of all accumulator step values - Object value = aggregators.get(aggregator.getName()); - if (value != null) { - return new AggregatorValues<T>() { - @Override - public Map<String, T> getValuesAtSteps() { - return (Map<String, T>) aggregators; - } - }; - } else { - throw new AggregatorRetrievalException("Accumulator results not found.", - new RuntimeException("Accumulator does not exist.")); - } - } - - @Override public String toString() { return "FlinkRunnerResult{" + "aggregators=" + aggregators http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 7cb0f0e..0399ada 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -29,7 +29,6 @@ import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.List; @@ -41,8 +40,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; @@ -488,30 +485,6 @@ public class DataflowPipelineJob implements PipelineResult { } @Override - public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator) - throws AggregatorRetrievalException { - try { - final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator); - return new AggregatorValues<OutputT>() { - @Override - public Map<String, OutputT> getValuesAtSteps() { - return stepValues; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("stepValues", stepValues) - .toString(); - } - }; - } catch (IOException e) { - throw new AggregatorRetrievalException( - "IOException when retrieving Aggregator values for Aggregator " + aggregator, e); - } - } - - @Override public MetricResults metrics() { return dataflowMetrics; } http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index d2c5c8e..1110a55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -27,11 +27,9 @@ import java.util.concurrent.TimeoutException; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.metrics.SparkMetricResults; import org.apache.beam.runners.spark.translation.SparkContextFactory; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.util.UserCodeException; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaSparkContext; @@ -84,11 +82,6 @@ public abstract class SparkPipelineResult implements PipelineResult { } @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) { - return SparkAggregators.valueOf(aggregator); - } - - @Override public PipelineResult.State getState() { return state; } http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java index 32caa9a..de5ae48 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java @@ -21,10 +21,10 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index 35f11eb..7e78e6e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; /** @@ -64,17 +63,6 @@ public interface PipelineResult { */ State waitUntilFinish(); - /** - * Retrieves the current value of the provided {@link Aggregator}. - * - * @param aggregator the {@link Aggregator} to retrieve values for. - * @return the current values of the {@link Aggregator}, - * which may be empty if there are no values yet. - * @throws AggregatorRetrievalException if the {@link Aggregator} values could not be retrieved. - */ - <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) - throws AggregatorRetrievalException; - // TODO: method to retrieve error messages. /** Named constants for common values for the job state. */