This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 41e1919 [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer 41e1919 is described below commit 41e1919d24f573d5bef14df72729bdaccd35a82e Author: Shruti Gumma <shruti_gu...@apple.com> AuthorDate: Thu Oct 1 19:28:06 2020 -0700 [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer ### What changes were proposed in this pull request? This is a backport PR for branch-3.0. This change was raised to `master` branch in `https://github.com/apache/spark/pull/29872` When `peakMemoryMetrics` in `ExecutorSummary` is `Option.empty`, then the `ExecutorMetricsJsonSerializer#serialize` method does not execute the `jsonGenerator.writeObject` method. This causes the json to be generated with `peakMemoryMetrics` key added to the serialized string, but no corresponding value. This causes an error to be thrown when it is the next key `attributes` turn to be added to the json: `com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value ` ### Why are the changes needed? At the start of the Spark job, if `peakMemoryMetrics` is `Option.empty`, then it causes a `com.fasterxml.jackson.core.JsonGenerationException` to be thrown when we navigate to the Executors tab in Spark UI. Complete stacktrace: > com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value > at com.fasterxml.jackson.core.JsonGenerator._reportError(JsonGenerator.java:2080) > at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeFieldName(WriterBasedJsonGenerator.java:161) > at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:725) > at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721) > at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166) > at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145) > at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:26) > at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents$(IterableSerializerModule.scala:25) > at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54) > at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54) > at com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase.serialize(AsArraySerializerBase.java:250) > at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) > at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) > at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:4094) > at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3404) > at org.apache.spark.ui.exec.ExecutorsPage.allExecutorsDataScript$1(ExecutorsTab.scala:64) > at org.apache.spark.ui.exec.ExecutorsPage.render(ExecutorsTab.scala:76) > at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:89) > at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:80) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) > at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:873) > at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623) > at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) > at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610) > at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540) > at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) > at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345) > at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) > at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480) > at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) > at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247) > at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) > at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753) > at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) > at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.sparkproject.jetty.server.Server.handle(Server.java:505) > at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:370) > at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) > at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) > at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:103) > at org.sparkproject.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) > at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) > at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) > at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) > at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) > at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) > at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698) > at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804) > at java.base/java.lang.Thread.run(Thread.java:834) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #29914 from shrutig/SPARK-32996-3.0. Authored-by: Shruti Gumma <shruti_gu...@apple.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../scala/org/apache/spark/status/api/v1/api.scala | 16 +++++-- .../spark/status/api/v1/ExecutorSummarySuite.scala | 51 ++++++++++++++++++++++ 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5ec9b36..37db64a 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -127,6 +127,10 @@ private[spark] class ExecutorMetricsJsonDeserializer new TypeReference[Option[Map[String, java.lang.Long]]] {}) metricsMap.map(metrics => new ExecutorMetrics(metrics)) } + + override def getNullValue(ctxt: DeserializationContext): Option[ExecutorMetrics] = { + None + } } /** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ private[spark] class ExecutorMetricsJsonSerializer @@ -135,11 +139,15 @@ private[spark] class ExecutorMetricsJsonSerializer metrics: Option[ExecutorMetrics], jsonGenerator: JsonGenerator, serializerProvider: SerializerProvider): Unit = { - metrics.foreach { m: ExecutorMetrics => - val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) => - metric -> m.getMetricValue(metric) + if (metrics.isEmpty) { + jsonGenerator.writeNull() + } else { + metrics.foreach { m: ExecutorMetrics => + val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) => + metric -> m.getMetricValue(metric) + } + jsonGenerator.writeObject(metricsMap) } - jsonGenerator.writeObject(metricsMap) } } diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala new file mode 100644 index 0000000..2723af7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.util.Date + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.spark.SparkFunSuite + +class ExecutorSummarySuite extends SparkFunSuite { + + test("Check ExecutorSummary serialize and deserialize with empty peakMemoryMetrics") { + val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + val executorSummary = new ExecutorSummary("id", "host:port", true, 1, + 10, 10, 1, 1, 1, + 0, 0, 1, 100, + 1, 100, 100, + 10, false, 20, new Date(1600984336352L), + Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map()) + val expectedJson = "{\"id\":\"id\",\"hostPort\":\"host:port\",\"isActive\":true," + + "\"rddBlocks\":1,\"memoryUsed\":10,\"diskUsed\":10,\"totalCores\":1,\"maxTasks\":1," + + "\"activeTasks\":1,\"failedTasks\":0,\"completedTasks\":0,\"totalTasks\":1," + + "\"totalDuration\":100,\"totalGCTime\":1,\"totalInputBytes\":100," + + "\"totalShuffleRead\":100,\"totalShuffleWrite\":10,\"isBlacklisted\":false," + + "\"maxMemory\":20,\"addTime\":1600984336352,\"removeTime\":null,\"removeReason\":null," + + "\"executorLogs\":{},\"memoryMetrics\":null,\"blacklistedInStages\":[]," + + "\"peakMemoryMetrics\":null,\"attributes\":{},\"resources\":{}}" + val json = mapper.writeValueAsString(executorSummary) + assert(expectedJson.equals(json)) + val deserializeExecutorSummary = mapper.readValue(json, new TypeReference[ExecutorSummary] {}) + assert(deserializeExecutorSummary.peakMemoryMetrics == None) + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org