Repository: incubator-beam Updated Branches: refs/heads/master 204678323 -> f346c877a
Added support for reporting aggregator values to Spark sinks Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/226dea2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/226dea2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/226dea2f Branch: refs/heads/master Commit: 226dea2f04de2c000733f1182bdd3d18d516d4e4 Parents: 2046783 Author: staslev <stasle...@gmail.com> Authored: Fri Aug 26 10:26:38 2016 +0300 Committer: Sela <ans...@paypal.com> Committed: Fri Aug 26 13:01:48 2016 +0300 ---------------------------------------------------------------------- runners/spark/pom.xml | 6 + .../runners/spark/SparkPipelineOptions.java | 7 +- .../spark/aggregators/NamedAggregators.java | 38 ++++- .../aggregators/metrics/AggregatorMetric.java | 44 +++++ .../metrics/AggregatorMetricSource.java | 49 ++++++ .../metrics/WithNamedAggregatorsSupport.java | 169 +++++++++++++++++++ .../spark/aggregators/metrics/package-info.java | 22 +++ .../spark/aggregators/metrics/sink/CsvSink.java | 39 +++++ .../aggregators/metrics/sink/GraphiteSink.java | 39 +++++ .../aggregators/metrics/sink/package-info.java | 23 +++ .../apache/beam/runners/spark/io/ConsoleIO.java | 2 +- .../beam/runners/spark/io/hadoop/HadoopIO.java | 4 +- .../spark/translation/SparkRuntimeContext.java | 29 +++- .../runners/spark/util/BroadcastHelper.java | 4 +- .../runners/spark/InMemoryMetricsSinkRule.java | 32 ++++ .../beam/runners/spark/SimpleWordCountTest.java | 12 ++ .../metrics/sink/InMemoryMetrics.java | 79 +++++++++ .../spark/src/test/resources/metrics.properties | 29 ++++ 18 files changed, 611 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index b924cb8..b928b44 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -37,6 +37,7 @@ <spark.version>1.6.2</spark.version> <hadoop.version>2.2.0</hadoop.version> <kafka.version>0.8.2.1</kafka.version> + <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version> </properties> <profiles> @@ -231,6 +232,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${dropwizard.metrics.version}</version> + </dependency> <!-- test dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 080ff19..be4f7f0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -38,10 +38,15 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, + "execution is stopped") @Default.Long(-1) Long getTimeout(); - void setTimeout(Long batchInterval); + void setTimeout(Long timeoutMillis); @Description("Batch interval for Spark streaming in milliseconds.") @Default.Long(1000) Long getBatchIntervalMillis(); void setBatchIntervalMillis(Long batchInterval); + + @Description("Enable/disable sending aggregator values to Spark's metric sinks") + @Default.Boolean(true) + Boolean getEnableSparkSinks(); + void setEnableSparkSinks(Boolean enableSparkSinks); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index e2cd963..4e96466 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -18,13 +18,18 @@ package org.apache.beam.runners.spark.aggregators; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Map; import java.util.TreeMap; + import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; @@ -70,6 +75,22 @@ public class NamedAggregators implements Serializable { } /** + * @return a map of all the aggregator names and their <b>rendered </b>values + */ + public Map<String, ?> renderAll() { + return + ImmutableMap.copyOf( + Maps.transformValues(mNamedAggregators, + new Function<State<?, ?, ?>, Object>() { + + @Override + public Object apply(State<?, ?, ?> state) { + return state.render(); + } + })); + } + + /** * Merges another NamedAggregators instance with this instance. * * @param other The other instance of named aggregators ot merge. @@ -116,6 +137,7 @@ public class NamedAggregators implements Serializable { * @param <OutputT> Output data type */ public interface State<InputT, InterT, OutputT> extends Serializable { + /** * @param element new element to update state */ @@ -133,16 +155,16 @@ public class NamedAggregators implements Serializable { /** * => combineFunction in data flow. */ - public static class CombineFunctionState<InputT, InterT, OutpuT> - implements State<InputT, InterT, OutpuT> { + public static class CombineFunctionState<InputT, InterT, OutputT> + implements State<InputT, InterT, OutputT> { - private Combine.CombineFn<InputT, InterT, OutpuT> combineFn; + private Combine.CombineFn<InputT, InterT, OutputT> combineFn; private Coder<InputT> inCoder; private SparkRuntimeContext ctxt; private transient InterT state; public CombineFunctionState( - Combine.CombineFn<InputT, InterT, OutpuT> combineFn, + Combine.CombineFn<InputT, InterT, OutputT> combineFn, Coder<InputT> inCoder, SparkRuntimeContext ctxt) { this.combineFn = combineFn; @@ -157,7 +179,7 @@ public class NamedAggregators implements Serializable { } @Override - public State<InputT, InterT, OutpuT> merge(State<InputT, InterT, OutpuT> other) { + public State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other) { this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current())); return this; } @@ -168,12 +190,12 @@ public class NamedAggregators implements Serializable { } @Override - public OutpuT render() { + public OutputT render() { return combineFn.extractOutput(state); } @Override - public Combine.CombineFn<InputT, InterT, OutpuT> getCombineFn() { + public Combine.CombineFn<InputT, InterT, OutputT> getCombineFn() { return combineFn; } @@ -192,7 +214,7 @@ public class NamedAggregators implements Serializable { @SuppressWarnings("unchecked") private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { ctxt = (SparkRuntimeContext) ois.readObject(); - combineFn = (Combine.CombineFn<InputT, InterT, OutpuT>) ois.readObject(); + combineFn = (Combine.CombineFn<InputT, InterT, OutputT>) ois.readObject(); inCoder = (Coder<InputT>) ois.readObject(); try { state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java new file mode 100644 index 0000000..c07a069 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics; + +import com.codahale.metrics.Metric; + +import org.apache.beam.runners.spark.aggregators.NamedAggregators; + +/** + * An adapter between the {@link NamedAggregators} and codahale's {@link Metric} + * interface. + */ +public class AggregatorMetric implements Metric { + + private final NamedAggregators namedAggregators; + + private AggregatorMetric(final NamedAggregators namedAggregators) { + this.namedAggregators = namedAggregators; + } + + public static AggregatorMetric of(final NamedAggregators namedAggregators) { + return new AggregatorMetric(namedAggregators); + } + + NamedAggregators getNamedAggregators() { + return namedAggregators; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java new file mode 100644 index 0000000..0658e04 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics; + +import com.codahale.metrics.MetricRegistry; + +import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.apache.spark.metrics.source.Source; + +/** + * A Spark {@link Source} that is tailored to expose an {@link AggregatorMetric}, + * wrapping an underlying {@link NamedAggregators} instance. + */ +public class AggregatorMetricSource implements Source { + + private static final String SOURCE_NAME = "NamedAggregators"; + + private final MetricRegistry metricRegistry = new MetricRegistry(); + + public AggregatorMetricSource(final NamedAggregators aggregators) { + metricRegistry.register(SOURCE_NAME, AggregatorMetric.of(aggregators)); + } + + @Override + public String sourceName() { + return SOURCE_NAME; + } + + @Override + public MetricRegistry metricRegistry() { + return metricRegistry; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java new file mode 100644 index 0000000..88e2211 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; + +import java.util.Map; +import java.util.SortedMap; + +import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link MetricRegistry} decorator-like* that supports {@link AggregatorMetric} by exposing + * the underlying * {@link org.apache.beam.runners.spark.aggregators.NamedAggregators}' + * aggregators as {@link Gauge}s. + * <p> + * *{@link MetricRegistry} is not an interface, so this is not a by-the-book decorator. + * That said, it delegates all metric related getters to the "decorated" instance. + * </p> + */ +public class WithNamedAggregatorsSupport extends MetricRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(WithNamedAggregatorsSupport.class); + + private MetricRegistry internalMetricRegistry; + + private WithNamedAggregatorsSupport(final MetricRegistry internalMetricRegistry) { + this.internalMetricRegistry = internalMetricRegistry; + } + + public static WithNamedAggregatorsSupport forRegistry(final MetricRegistry metricRegistry) { + return new WithNamedAggregatorsSupport(metricRegistry); + } + + @Override + public SortedMap<String, Timer> getTimers(final MetricFilter filter) { + return internalMetricRegistry.getTimers(filter); + } + + @Override + public SortedMap<String, Meter> getMeters(final MetricFilter filter) { + return internalMetricRegistry.getMeters(filter); + } + + @Override + public SortedMap<String, Histogram> getHistograms(final MetricFilter filter) { + return internalMetricRegistry.getHistograms(filter); + } + + @Override + public SortedMap<String, Counter> getCounters(final MetricFilter filter) { + return internalMetricRegistry.getCounters(filter); + } + + @Override + public SortedMap<String, Gauge> getGauges(final MetricFilter filter) { + return + new ImmutableSortedMap.Builder<String, Gauge>( + Ordering.from(String.CASE_INSENSITIVE_ORDER)) + .putAll(internalMetricRegistry.getGauges(filter)) + .putAll(extractGauges(internalMetricRegistry, filter)) + .build(); + } + + private Map<String, Gauge> extractGauges(final MetricRegistry metricRegistry, + final MetricFilter filter) { + + // find the AggregatorMetric metrics from within all currently registered metrics + final Optional<Map<String, Gauge>> gauges = + FluentIterable + .from(metricRegistry.getMetrics().entrySet()) + .firstMatch(isAggregatorMetric()) + .transform(toGauges()); + + return + gauges.isPresent() + ? Maps.filterEntries(gauges.get(), matches(filter)) + : ImmutableMap.<String, Gauge>of(); + } + + private Function<Map.Entry<String, Metric>, Map<String, Gauge>> toGauges() { + return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() { + @Override + public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) { + final NamedAggregators agg = ((AggregatorMetric) entry.getValue()).getNamedAggregators(); + final Map<String, Gauge> gaugeMap = Maps.transformEntries(agg.renderAll(), toGauge()); + return Maps.filterValues(gaugeMap, Predicates.notNull()); + } + }; + } + + private Maps.EntryTransformer<String, Object, Gauge> toGauge() { + return new Maps.EntryTransformer<String, Object, Gauge>() { + + @Override + public Gauge transformEntry(final String name, final Object rawValue) { + return new Gauge<Double>() { + + @Override + public Double getValue() { + // at the moment the metric's type is assumed to be + // compatible with Double. While far from perfect, it seems reasonable at + // this point in time + try { + return Double.parseDouble(rawValue.toString()); + } catch (final Exception e) { + LOG.warn("Failed reporting metric with name [{}], of type [{}], since it could not be" + + " converted to double", name, rawValue.getClass().getSimpleName(), e); + return null; + } + } + }; + } + }; + } + + private Predicate<Map.Entry<String, Gauge>> matches(final MetricFilter filter) { + return new Predicate<Map.Entry<String, Gauge>>() { + @Override + public boolean apply(final Map.Entry<String, Gauge> entry) { + return filter.matches(entry.getKey(), entry.getValue()); + } + }; + } + + private Predicate<Map.Entry<String, Metric>> isAggregatorMetric() { + return new Predicate<Map.Entry<String, Metric>>() { + @Override + public boolean apply(final Map.Entry<String, Metric> metricEntry) { + return (metricEntry.getValue() instanceof AggregatorMetric); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java new file mode 100644 index 0000000..f19f635 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines classes for integrating with Spark's metrics mechanism (Sinks, Sources, etc.). + */ +package org.apache.beam.runners.spark.aggregators.metrics; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java new file mode 100644 index 0000000..af1601a --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import com.codahale.metrics.MetricRegistry; + +import java.util.Properties; + +import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric; +import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport; +import org.apache.spark.metrics.sink.Sink; + +/** + * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics + * to a CSV file. + */ +public class CsvSink extends org.apache.spark.metrics.sink.CsvSink { + public CsvSink(final Properties properties, + final MetricRegistry metricRegistry, + final org.apache.spark.SecurityManager securityMgr) { + super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), securityMgr); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java new file mode 100644 index 0000000..7a45ef7 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import com.codahale.metrics.MetricRegistry; + +import java.util.Properties; + +import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric; +import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport; +import org.apache.spark.metrics.sink.Sink; + +/** + * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics + * to Graphite. + */ +public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink { + public GraphiteSink(final Properties properties, + final MetricRegistry metricRegistry, + final org.apache.spark.SecurityManager securityMgr) { + super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), securityMgr); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java new file mode 100644 index 0000000..2e6dd0d --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Spark sinks that support + * the {@link org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric}. + */ +package org.apache.beam.runners.spark.aggregators.metrics.sink; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java index eefea77..b1c567c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java @@ -47,7 +47,7 @@ public final class ConsoleIO { /** * {@link PTransform} writing {@link PCollection} on the console. - * @param <T> + * @param <T> the type of the elements in the {@link PCollection} */ public static class Unbound<T> extends PTransform<PCollection<T>, PDone> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java index 7b10610..70bec78 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java @@ -55,8 +55,8 @@ public final class HadoopIO { /** * A {@link PTransform} reading bounded collection of data from HDFS. - * @param <K> - * @param <V> + * @param <K> the type of the keys + * @param <V> the type of the values */ public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 2634c65..4e4cd1a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -20,14 +20,19 @@ package org.apache.beam.runners.spark.translation; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; + import com.google.common.collect.ImmutableList; + import java.io.IOException; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.Map; + +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AggAccumParam; import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -41,7 +46,9 @@ import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.spark.Accumulator; +import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.metrics.MetricsSystem; /** @@ -63,8 +70,9 @@ public class SparkRuntimeContext implements Serializable { private transient CoderRegistry coderRegistry; SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) { - this.accum = jsc.accumulator(new NamedAggregators(), new AggAccumParam()); - this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions()); + final SparkPipelineOptions opts = pipeline.getOptions().as(SparkPipelineOptions.class); + accum = registerMetrics(jsc, opts); + serializedPipelineOptions = serializePipelineOptions(opts); } private static String serializePipelineOptions(PipelineOptions pipelineOptions) { @@ -83,6 +91,23 @@ public class SparkRuntimeContext implements Serializable { } } + private Accumulator<NamedAggregators> registerMetrics(final JavaSparkContext jsc, + final SparkPipelineOptions opts) { + final NamedAggregators initialValue = new NamedAggregators(); + final Accumulator<NamedAggregators> accum = jsc.accumulator(initialValue, new AggAccumParam()); + + if (opts.getEnableSparkSinks()) { + final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); + final AggregatorMetricSource aggregatorMetricSource = + new AggregatorMetricSource(initialValue); + // in case the context was not cleared + metricsSystem.removeSource(aggregatorMetricSource); + metricsSystem.registerSource(aggregatorMetricSource); + } + + return accum; + } + /** * Retrieves corresponding value of an aggregator. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java index 5f0c795..5c13b80 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java @@ -58,7 +58,7 @@ public abstract class BroadcastHelper<T> implements Serializable { * A {@link BroadcastHelper} that relies on the underlying * Spark serialization (Kryo) to broadcast values. This is appropriate when * broadcasting very large values, since no copy of the object is made. - * @param <T> + * @param <T> the type of the value stored in the broadcast variable */ static class DirectBroadcastHelper<T> extends BroadcastHelper<T> { private Broadcast<T> bcast; @@ -86,7 +86,7 @@ public abstract class BroadcastHelper<T> implements Serializable { * A {@link BroadcastHelper} that uses a * {@link Coder} to encode values as byte arrays * before broadcasting. - * @param <T> + * @param <T> the type of the value stored in the broadcast variable */ static class CodedBroadcastHelper<T> extends BroadcastHelper<T> { private Broadcast<byte[]> bcast; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java new file mode 100644 index 0000000..506dbbd --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics; +import org.junit.rules.ExternalResource; + +/** + * A rule that cleans the {@link InMemoryMetrics} after the tests has finished. + */ +class InMemoryMetricsSinkRule extends ExternalResource { + @Override + protected void before() throws Throwable { + InMemoryMetrics.clearAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index f644765..8b7762f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.spark; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableSet; @@ -27,6 +29,7 @@ import java.io.File; import java.util.Arrays; import java.util.List; import java.util.Set; +import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -39,12 +42,17 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.commons.io.FileUtils; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExternalResource; import org.junit.rules.TemporaryFolder; /** * Simple word count test. */ public class SimpleWordCountTest { + + @Rule + public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); + private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; @@ -54,6 +62,8 @@ public class SimpleWordCountTest { @Test public void testInMem() throws Exception { + assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); @@ -66,6 +76,8 @@ public class SimpleWordCountTest { EvaluationResult res = (EvaluationResult) p.run(); res.close(); + + assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d)); } @Rule http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java new file mode 100644 index 0000000..35e6717 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; + +import java.util.Properties; + +import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport; +import org.apache.spark.metrics.sink.Sink; + +/** + * An in-memory {@link Sink} implementation for tests. + */ +public class InMemoryMetrics implements Sink { + + private static WithNamedAggregatorsSupport extendedMetricsRegistry; + private static MetricRegistry internalMetricRegistry; + + public InMemoryMetrics(final Properties properties, + final MetricRegistry metricRegistry, + final org.apache.spark.SecurityManager securityMgr) { + extendedMetricsRegistry = WithNamedAggregatorsSupport.forRegistry(metricRegistry); + internalMetricRegistry = metricRegistry; + } + + @SuppressWarnings("unchecked") + public static <T> T valueOf(final String name) { + final T retVal; + + if (extendedMetricsRegistry != null + && extendedMetricsRegistry.getGauges().containsKey(name)) { + retVal = (T) extendedMetricsRegistry.getGauges().get(name).getValue(); + } else { + retVal = null; + } + + return retVal; + } + + public static void clearAll() { + if (internalMetricRegistry != null) { + internalMetricRegistry.removeMatching(MetricFilter.ALL); + } + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public void report() { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/resources/metrics.properties ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/resources/metrics.properties b/runners/spark/src/test/resources/metrics.properties new file mode 100644 index 0000000..4aa01d2 --- /dev/null +++ b/runners/spark/src/test/resources/metrics.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +*.sink.memory.class=org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics + +#*.sink.csv.class=org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink +#*.sink.csv.directory=/tmp/spark-metrics +#*.sink.csv.period=1 +#*.sink.graphite.unit=SECONDS + +#*.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink +#*.sink.graphite.host=YOUR_HOST +#*.sink.graphite.port=2003 +#*.sink.graphite.prefix=spark +#*.sink.graphite.period=1 +#*.sink.graphite.unit=SECONDS