This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 41e1919  [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in 
ExecutorMetricsJsonSerializer
41e1919 is described below

commit 41e1919d24f573d5bef14df72729bdaccd35a82e
Author: Shruti Gumma <shruti_gu...@apple.com>
AuthorDate: Thu Oct 1 19:28:06 2020 -0700

    [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in 
ExecutorMetricsJsonSerializer
    
    ### What changes were proposed in this pull request?
    This is a backport PR for branch-3.0. This change was raised to `master` 
branch in `https://github.com/apache/spark/pull/29872`
    
    When `peakMemoryMetrics` in `ExecutorSummary` is `Option.empty`, then the 
`ExecutorMetricsJsonSerializer#serialize` method does not execute the 
`jsonGenerator.writeObject` method. This causes the json to be generated with 
`peakMemoryMetrics` key added to the serialized string, but no corresponding 
value.
    This causes an error to be thrown when it is the next key `attributes` turn 
to be added to the json:
    `com.fasterxml.jackson.core.JsonGenerationException: Can not write a field 
name, expecting a value
    `
    
    ### Why are the changes needed?
    At the start of the Spark job, if `peakMemoryMetrics` is `Option.empty`, 
then it causes
    a `com.fasterxml.jackson.core.JsonGenerationException` to be thrown when we 
navigate to the Executors tab in Spark UI.
    Complete stacktrace:
    
    > com.fasterxml.jackson.core.JsonGenerationException: Can not write a field 
name, expecting a value
    >   at 
com.fasterxml.jackson.core.JsonGenerator._reportError(JsonGenerator.java:2080)
    >   at 
com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeFieldName(WriterBasedJsonGenerator.java:161)
    >   at 
com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:725)
    >   at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
    >   at 
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    >   at 
com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
    >   at 
com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:26)
    >   at 
com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents$(IterableSerializerModule.scala:25)
    >   at 
com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
    >   at 
com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
    >   at 
com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase.serialize(AsArraySerializerBase.java:250)
    >   at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
    >   at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
    >   at 
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:4094)
    >   at 
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3404)
    >   at 
org.apache.spark.ui.exec.ExecutorsPage.allExecutorsDataScript$1(ExecutorsTab.scala:64)
    >   at org.apache.spark.ui.exec.ExecutorsPage.render(ExecutorsTab.scala:76)
    >   at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:89)
    >   at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:80)
    >   at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
    >   at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    >   at 
org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)
    >   at 
org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
    >   at 
org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95)
    >   at 
org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
    >   at 
org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
    >   at 
org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
    >   at 
org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
    >   at 
org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
    >   at 
org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
    >   at 
org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
    >   at 
org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
    >   at 
org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
    >   at 
org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
    >   at 
org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
    >   at 
org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
    >   at org.sparkproject.jetty.server.Server.handle(Server.java:505)
    >   at 
org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:370)
    >   at 
org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
    >   at 
org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
    >   at 
org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:103)
    >   at 
org.sparkproject.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
    >   at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
    >   at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
    >   at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
    >   at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
    >   at 
org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
    >   at 
org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
    >   at 
org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
    >   at java.base/java.lang.Thread.run(Thread.java:834)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    Closes #29914 from shrutig/SPARK-32996-3.0.
    
    Authored-by: Shruti Gumma <shruti_gu...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../scala/org/apache/spark/status/api/v1/api.scala | 16 +++++--
 .../spark/status/api/v1/ExecutorSummarySuite.scala | 51 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5ec9b36..37db64a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -127,6 +127,10 @@ private[spark] class ExecutorMetricsJsonDeserializer
       new TypeReference[Option[Map[String, java.lang.Long]]] {})
     metricsMap.map(metrics => new ExecutorMetrics(metrics))
   }
+
+  override def getNullValue(ctxt: DeserializationContext): 
Option[ExecutorMetrics] = {
+    None
+  }
 }
 /** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with 
metric name as key */
 private[spark] class ExecutorMetricsJsonSerializer
@@ -135,11 +139,15 @@ private[spark] class ExecutorMetricsJsonSerializer
       metrics: Option[ExecutorMetrics],
       jsonGenerator: JsonGenerator,
       serializerProvider: SerializerProvider): Unit = {
-    metrics.foreach { m: ExecutorMetrics =>
-      val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, 
_) =>
-        metric -> m.getMetricValue(metric)
+    if (metrics.isEmpty) {
+      jsonGenerator.writeNull()
+    } else {
+      metrics.foreach { m: ExecutorMetrics =>
+        val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, 
_) =>
+          metric -> m.getMetricValue(metric)
+        }
+        jsonGenerator.writeObject(metricsMap)
       }
-      jsonGenerator.writeObject(metricsMap)
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala 
b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala
new file mode 100644
index 0000000..2723af7
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.Date
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.SparkFunSuite
+
+class ExecutorSummarySuite extends SparkFunSuite {
+
+  test("Check ExecutorSummary serialize and deserialize with empty 
peakMemoryMetrics") {
+    val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+    val executorSummary = new ExecutorSummary("id", "host:port", true, 1,
+      10, 10, 1, 1, 1,
+      0, 0, 1, 100,
+      1, 100, 100,
+      10, false, 20, new Date(1600984336352L),
+      Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, 
Map(), Map())
+    val expectedJson = 
"{\"id\":\"id\",\"hostPort\":\"host:port\",\"isActive\":true," +
+      
"\"rddBlocks\":1,\"memoryUsed\":10,\"diskUsed\":10,\"totalCores\":1,\"maxTasks\":1,"
 +
+      
"\"activeTasks\":1,\"failedTasks\":0,\"completedTasks\":0,\"totalTasks\":1," +
+      "\"totalDuration\":100,\"totalGCTime\":1,\"totalInputBytes\":100," +
+      
"\"totalShuffleRead\":100,\"totalShuffleWrite\":10,\"isBlacklisted\":false," +
+      
"\"maxMemory\":20,\"addTime\":1600984336352,\"removeTime\":null,\"removeReason\":null,"
 +
+      "\"executorLogs\":{},\"memoryMetrics\":null,\"blacklistedInStages\":[]," 
+
+      "\"peakMemoryMetrics\":null,\"attributes\":{},\"resources\":{}}"
+    val json = mapper.writeValueAsString(executorSummary)
+    assert(expectedJson.equals(json))
+    val deserializeExecutorSummary = mapper.readValue(json, new 
TypeReference[ExecutorSummary] {})
+    assert(deserializeExecutorSummary.peakMemoryMetrics == None)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to