Repository: spark Updated Branches: refs/heads/branch-2.2 708f68c8a -> 0bd918f67
[SPARK-12837][SPARK-20666][CORE][FOLLOWUP] getting name should not fail if accumulator is garbage collected ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`. This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`. ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #17931 from cloud-fan/bug. (cherry picked from commit e1aaab1e277b1b07c26acea75ade78e39bdac209) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bd918f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bd918f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bd918f6 Branch: refs/heads/branch-2.2 Commit: 0bd918f67630f83cdc2922a2f48bd28b023ef821 Parents: 708f68c Author: Wenchen Fan <wenc...@databricks.com> Authored: Mon May 15 09:22:06 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Mon May 15 09:22:24 2017 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/AccumulatorV2.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0bd918f6/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index a65ec75..1a9a692 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -84,10 +84,11 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { * Returns the name of this accumulator, can only be called after registration. */ final def name: Option[String] = { + assertMetadataNotNull() + if (atDriverSide) { - AccumulatorContext.get(id).flatMap(_.metadata.name) + metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name)) } else { - assertMetadataNotNull() metadata.name } } @@ -165,13 +166,15 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") - val isInternalAcc = - (name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) || - getClass.getSimpleName == "SQLMetric" + val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { + // For non-internal accumulators, we still need to send the name because users may need to + // access the accumulator name at executor side, or they may keep the accumulators sent from + // executors and access the name when the registered accumulator is already garbage + // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org