Remove duplicate classes from spark runner marking sdk classes Serializable 
instead.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22865780
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22865780
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22865780

Branch: refs/heads/master
Commit: 228657808f76e38f0b16767020d6d7e149d5dcdf
Parents: 31624fe
Author: Aviem Zur <aviem...@gmail.com>
Authored: Thu Jan 19 13:58:14 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Feb 15 11:10:48 2017 +0200

----------------------------------------------------------------------
 .../runners/spark/metrics/MetricAggregator.java | 113 ----------
 .../spark/metrics/MetricsAccumulatorParam.java  |   4 +-
 .../spark/metrics/SparkMetricResults.java       |  28 ++-
 .../spark/metrics/SparkMetricsContainer.java    | 205 +++----------------
 .../beam/sdk/metrics/DistributionData.java      |   3 +-
 .../org/apache/beam/sdk/metrics/MetricKey.java  |   3 +-
 .../apache/beam/sdk/metrics/MetricUpdates.java  |   3 +-
 7 files changed, 46 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java
deleted file mode 100644
index 79e49ce..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.metrics;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.metrics.DistributionData;
-import org.apache.beam.sdk.metrics.MetricKey;
-
-
-/**
- * Metric values wrapper which adds aggregation methods.
- * @param <ValueT> Metric value type.
- */
-abstract class MetricAggregator<ValueT> implements Serializable {
-  private final MetricKey key;
-  protected ValueT value;
-
-  private MetricAggregator(MetricKey key, ValueT value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  public MetricKey getKey() {
-    return key;
-  }
-
-  public ValueT getValue() {
-    return value;
-  }
-
-  @SuppressWarnings("unused")
-  abstract MetricAggregator<ValueT> updated(ValueT update);
-
-  static class CounterAggregator extends MetricAggregator<Long> {
-    CounterAggregator(MetricKey key, Long value) {
-      super(key, value);
-    }
-
-    @Override
-    CounterAggregator updated(Long counterUpdate) {
-      value = value + counterUpdate;
-      return this;
-    }
-  }
-
-  static class DistributionAggregator extends 
MetricAggregator<DistributionData> {
-    DistributionAggregator(MetricKey key, DistributionData value) {
-      super(key, value);
-    }
-
-    @Override
-    DistributionAggregator updated(DistributionData distributionUpdate) {
-      this.value = new 
SparkDistributionData(this.value.combine(distributionUpdate));
-      return this;
-    }
-  }
-
-  static class SparkDistributionData extends DistributionData implements 
Serializable {
-    private final long sum;
-    private final long count;
-    private final long min;
-    private final long max;
-
-    SparkDistributionData(DistributionData original) {
-      this.sum = original.sum();
-      this.count = original.count();
-      this.min = original.min();
-      this.max = original.max();
-    }
-
-    @Override
-    public long sum() {
-      return sum;
-    }
-
-    @Override
-    public long count() {
-      return count;
-    }
-
-    @Override
-    public long min() {
-      return min;
-    }
-
-    @Override
-    public long max() {
-      return max;
-    }
-  }
-
-  static <T> MetricAggregator<T> updated(MetricAggregator<T> metricAggregator, 
Object updateValue) {
-    //noinspection unchecked
-    return metricAggregator.updated((T) updateValue);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
index cd54097..9948c81 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
@@ -27,12 +27,12 @@ import org.apache.spark.AccumulatorParam;
 class MetricsAccumulatorParam implements 
AccumulatorParam<SparkMetricsContainer> {
   @Override
   public SparkMetricsContainer addAccumulator(SparkMetricsContainer c1, 
SparkMetricsContainer c2) {
-    return c1.merge(c2);
+    return c1.update(c2);
   }
 
   @Override
   public SparkMetricsContainer addInPlace(SparkMetricsContainer c1, 
SparkMetricsContainer c2) {
-    return c1.merge(c2);
+    return c1.update(c2);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
index 64b92b7..330b060 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
@@ -23,8 +23,7 @@ import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import java.util.Set;
-import 
org.apache.beam.runners.spark.metrics.MetricAggregator.CounterAggregator;
-import 
org.apache.beam.runners.spark.metrics.MetricAggregator.DistributionAggregator;
+import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
@@ -32,6 +31,7 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 
 
@@ -72,10 +72,10 @@ public class SparkMetricResults extends MetricResults {
               .toList();
     }
 
-    private Predicate<MetricAggregator<?>> matchesFilter(final MetricsFilter 
filter) {
-      return new Predicate<MetricAggregator<?>>() {
+    private Predicate<MetricUpdate<?>> matchesFilter(final MetricsFilter 
filter) {
+      return new Predicate<MetricUpdate<?>>() {
         @Override
-        public boolean apply(MetricAggregator<?> metricResult) {
+        public boolean apply(MetricUpdate<?> metricResult) {
           return matches(filter, metricResult.getKey());
         }
       };
@@ -116,32 +116,30 @@ public class SparkMetricResults extends MetricResults {
     }
   }
 
-  private static final Function<DistributionAggregator, 
MetricResult<DistributionResult>>
+  private static final Function<MetricUpdate<DistributionData>, 
MetricResult<DistributionResult>>
       TO_DISTRIBUTION_RESULT =
-      new Function<DistributionAggregator, MetricResult<DistributionResult>>() 
{
+      new Function<MetricUpdate<DistributionData>, 
MetricResult<DistributionResult>>() {
         @Override
-        public MetricResult<DistributionResult>
-        apply(DistributionAggregator metricResult) {
+        public MetricResult<DistributionResult> 
apply(MetricUpdate<DistributionData> metricResult) {
           if (metricResult != null) {
             MetricKey key = metricResult.getKey();
             return new SparkMetricResult<>(key.metricName(), key.stepName(),
-                metricResult.getValue().extractResult());
+                metricResult.getUpdate().extractResult());
           } else {
             return null;
           }
         }
       };
 
-  private static final Function<CounterAggregator, MetricResult<Long>>
+  private static final Function<MetricUpdate<Long>, MetricResult<Long>>
       TO_COUNTER_RESULT =
-      new Function<CounterAggregator, MetricResult<Long>>() {
+      new Function<MetricUpdate<Long>, MetricResult<Long>>() {
         @Override
-        public MetricResult<Long>
-        apply(CounterAggregator metricResult) {
+        public MetricResult<Long> apply(MetricUpdate<Long> metricResult) {
           if (metricResult != null) {
             MetricKey key = metricResult.getKey();
             return new SparkMetricResult<>(key.metricName(), key.stepName(),
-                metricResult.getValue());
+                metricResult.getUpdate());
           } else {
             return null;
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
index 234cb81..9d5bb47 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
@@ -18,13 +18,9 @@
 
 package org.apache.beam.runners.spark.metrics;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
@@ -32,12 +28,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import 
org.apache.beam.runners.spark.metrics.MetricAggregator.CounterAggregator;
-import 
org.apache.beam.runners.spark.metrics.MetricAggregator.DistributionAggregator;
-import 
org.apache.beam.runners.spark.metrics.MetricAggregator.SparkDistributionData;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.MetricKey;
-import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -53,7 +45,8 @@ public class SparkMetricsContainer implements Serializable {
 
   private transient volatile LoadingCache<String, MetricsContainer> 
metricsContainers;
 
-  private final Map<MetricKey, MetricAggregator<?>> metrics = new HashMap<>();
+  private final Map<MetricKey, MetricUpdate<Long>> counters = new HashMap<>();
+  private final Map<MetricKey, MetricUpdate<DistributionData>> distributions = 
new HashMap<>();
 
   public MetricsContainer getContainer(String stepName) {
     if (metricsContainers == null) {
@@ -72,69 +65,24 @@ public class SparkMetricsContainer implements Serializable {
     }
   }
 
-  static Collection<CounterAggregator> getCounters() {
-    return
-        FluentIterable
-            .from(getInstance().metrics.values())
-            .filter(IS_COUNTER)
-            .transform(TO_COUNTER)
-            .toList();
+  static Collection<MetricUpdate<Long>> getCounters() {
+    return getInstance().counters.values();
   }
 
-  private static final Predicate<MetricAggregator<?>> IS_COUNTER =
-      new Predicate<MetricAggregator<?>>() {
-        @Override
-        public boolean apply(MetricAggregator<?> input) {
-          return (input instanceof CounterAggregator);
-        }
-      };
-
-  private static final Function<MetricAggregator<?>, CounterAggregator> 
TO_COUNTER =
-      new Function<MetricAggregator<?>,
-          CounterAggregator>() {
-        @Override
-        public CounterAggregator apply(MetricAggregator<?> metricAggregator) {
-          return (CounterAggregator) metricAggregator;
-        }
-      };
-
-  static Collection<DistributionAggregator> getDistributions() {
-    return
-        FluentIterable
-            .from(getInstance().metrics.values())
-            .filter(IS_DISTRIBUTION)
-            .transform(TO_DISTRIBUTION)
-            .toList();
+  static Collection<MetricUpdate<DistributionData>> getDistributions() {
+    return getInstance().distributions.values();
   }
 
-  private static final Predicate<MetricAggregator<?>> IS_DISTRIBUTION =
-      new Predicate<MetricAggregator<?>>() {
-        @Override
-        public boolean apply(MetricAggregator<?> input) {
-          return (input instanceof DistributionAggregator);
-        }
-      };
-
-  private static final Function<MetricAggregator<?>, DistributionAggregator> 
TO_DISTRIBUTION =
-      new Function<MetricAggregator<?>, DistributionAggregator>() {
-        @Override
-        public DistributionAggregator apply(MetricAggregator<?> 
metricAggregator) {
-          return (DistributionAggregator) metricAggregator;
-        }
-      };
-
-  SparkMetricsContainer merge(SparkMetricsContainer other) {
-    return this.updated(other.getAggregators());
+  SparkMetricsContainer update(SparkMetricsContainer other) {
+    this.updateCounters(other.counters.values());
+    this.updateDistributions(other.distributions.values());
+    return this;
   }
 
   private static SparkMetricsContainer getInstance() {
     return MetricsAccumulator.getInstance().value();
   }
 
-  private Collection<MetricAggregator<?>> getAggregators() {
-    return metrics.values();
-  }
-
   private void writeObject(ObjectOutputStream out) throws IOException {
     // Since MetricsContainer instances are not serializable, materialize a 
serializable map of
     // MetricsAggregators relating to the same metrics. This is done here, 
when Spark serializes
@@ -148,44 +96,28 @@ public class SparkMetricsContainer implements Serializable 
{
     if (metricsContainers != null) {
       for (MetricsContainer container : metricsContainers.asMap().values()) {
         MetricUpdates cumulative = container.getCumulative();
-        updated(Iterables.transform(cumulative.counterUpdates(), 
TO_COUNTER_AGGREGATOR));
-        updated(Iterables.transform(cumulative.distributionUpdates(), 
TO_DISTRIBUTION_AGGREGATOR));
+        this.updateCounters(cumulative.counterUpdates());
+        this.updateDistributions(cumulative.distributionUpdates());
       }
     }
   }
 
-  private static final Function<MetricUpdate<Long>, MetricAggregator<?>>
-      TO_COUNTER_AGGREGATOR = new Function<MetricUpdate<Long>, 
MetricAggregator<?>>() {
-    @SuppressWarnings("ConstantConditions")
-    @Override
-    public CounterAggregator
-    apply(MetricUpdate<Long> update) {
-      return update != null ? new CounterAggregator(new 
SparkMetricKey(update.getKey()),
-          update.getUpdate()) : null;
+  private void updateCounters(Iterable<MetricUpdate<Long>> updates) {
+    for (MetricUpdate<Long> update : updates) {
+      MetricKey key = update.getKey();
+      MetricUpdate<Long> current = counters.get(key);
+      counters.put(key, current != null
+          ? MetricUpdate.create(key, current.getUpdate() + update.getUpdate()) 
: update);
     }
-  };
-
-  private static final Function<MetricUpdate<DistributionData>, 
MetricAggregator<?>>
-      TO_DISTRIBUTION_AGGREGATOR =
-      new Function<MetricUpdate<DistributionData>, MetricAggregator<?>>() {
-        @SuppressWarnings("ConstantConditions")
-        @Override
-        public DistributionAggregator
-        apply(MetricUpdate<DistributionData> update) {
-          return update != null ? new DistributionAggregator(new 
SparkMetricKey(update.getKey()),
-              new SparkDistributionData(update.getUpdate())) : null;
-        }
-      };
+  }
 
-  private SparkMetricsContainer updated(Iterable<MetricAggregator<?>> updates) 
{
-    for (MetricAggregator<?> update : updates) {
+  private void updateDistributions(Iterable<MetricUpdate<DistributionData>> 
updates) {
+    for (MetricUpdate<DistributionData> update : updates) {
       MetricKey key = update.getKey();
-      MetricAggregator<?> current = metrics.get(key);
-      Object updateValue = update.getValue();
-      metrics.put(new SparkMetricKey(key),
-          current != null ? MetricAggregator.updated(current, updateValue) : 
update);
+      MetricUpdate<DistributionData> current = distributions.get(key);
+      distributions.put(key, current != null
+          ? MetricUpdate.create(key, 
current.getUpdate().combine(update.getUpdate())) : update);
     }
-    return this;
   }
 
   private static class MetricsContainerCacheLoader extends CacheLoader<String, 
MetricsContainer> {
@@ -196,97 +128,10 @@ public class SparkMetricsContainer implements 
Serializable {
     }
   }
 
-  private static class SparkMetricKey extends MetricKey implements 
Serializable {
-    private final String stepName;
-    private final MetricName metricName;
-
-    SparkMetricKey(MetricKey original) {
-      this.stepName = original.stepName();
-      MetricName metricName = original.metricName();
-      this.metricName = new SparkMetricName(metricName.namespace(), 
metricName.name());
-    }
-
-    @Override
-    public String stepName() {
-      return stepName;
-    }
-
-    @Override
-    public MetricName metricName() {
-      return metricName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == this) {
-        return true;
-      }
-      if (o instanceof MetricKey) {
-        MetricKey that = (MetricKey) o;
-        return (this.stepName.equals(that.stepName()))
-            && (this.metricName.equals(that.metricName()));
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      int h = 1;
-      h *= 1000003;
-      h ^= stepName.hashCode();
-      h *= 1000003;
-      h ^= metricName.hashCode();
-      return h;
-    }
-  }
-
-  private static class SparkMetricName extends MetricName implements 
Serializable {
-    private final String namespace;
-    private final String name;
-
-    SparkMetricName(String namespace, String name) {
-      this.namespace = namespace;
-      this.name = name;
-    }
-
-    @Override
-    public String namespace() {
-      return namespace;
-    }
-
-    @Override
-    public String name() {
-      return name;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == this) {
-        return true;
-      }
-      if (o instanceof MetricName) {
-        MetricName that = (MetricName) o;
-        return (this.namespace.equals(that.namespace()))
-            && (this.name.equals(that.name()));
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      int h = 1;
-      h *= 1000003;
-      h ^= namespace.hashCode();
-      h *= 1000003;
-      h ^= name.hashCode();
-      return h;
-    }
-  }
-
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    for (Map.Entry<String, ?> metric :  new 
SparkBeamMetric().renderAll().entrySet()) {
+    for (Map.Entry<String, ?> metric : new 
SparkBeamMetric().renderAll().entrySet()) {
       sb.append(metric.getKey()).append(": 
").append(metric.getValue()).append(" ");
     }
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
index 59c7fbd..8068e1b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.metrics;
 
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
 
 /**
  * Data describing the the distribution. This should retain enough detail that 
it can be combined
@@ -28,7 +29,7 @@ import com.google.auto.value.AutoValue;
  * the approximate value of those quantiles.
  */
 @AutoValue
-public abstract class DistributionData {
+public abstract class DistributionData implements Serializable {
 
   public abstract long sum();
   public abstract long count();

http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
index bfa4df5..8706853 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.metrics;
 
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 
@@ -26,7 +27,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  */
 @Experimental(Kind.METRICS)
 @AutoValue
-public abstract class MetricKey {
+public abstract class MetricKey implements Serializable {
 
   /** The step name that is associated with this metric. */
   public abstract String stepName();

http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
index e84dc66..56466d8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.metrics;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.Iterables;
+import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -39,7 +40,7 @@ public abstract class MetricUpdates {
    * @param <T> The type of value representing the update.
    */
   @AutoValue
-  public abstract static class MetricUpdate<T> {
+  public abstract static class MetricUpdate<T> implements Serializable {
 
     /** The key being updated. */
     public abstract MetricKey getKey();

Reply via email to