This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 878ac2fe488 [SPARK-41759][CORE] Use `weakIntern` on string values in 
create new objects during deserialization
878ac2fe488 is described below

commit 878ac2fe4880472f03e6d9907a68fde30479b281
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Thu Jan 19 13:53:34 2023 -0800

    [SPARK-41759][CORE] Use `weakIntern` on string values in create new objects 
during deserialization
    
    ### What changes were proposed in this pull request?
    The pr aims to use weakIntern on string values in create new objects during 
deserialization.
    
    ### Why are the changes needed?
    Following guid: https://github.com/apache/spark/pull/39270.
    <img width="587" alt="image" 
src="https://user-images.githubusercontent.com/15246973/209924939-779db183-f0a6-4f3b-aebf-cf00b6c95cf8.png";>
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA.
    
    Closes #39275 from panbingkun/SPARK-41759.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/status/protobuf/AccumulableInfoSerializer.scala    |  3 ++-
 .../protobuf/ExecutorStageSummaryWrapperSerializer.scala     |  3 ++-
 .../status/protobuf/ExecutorSummaryWrapperSerializer.scala   |  9 +++++----
 .../apache/spark/status/protobuf/PoolDataSerializer.scala    |  3 +--
 .../spark/status/protobuf/StageDataWrapperSerializer.scala   | 12 +++++-------
 .../spark/status/protobuf/StreamBlockDataSerializer.scala    |  7 ++++---
 6 files changed, 19 insertions(+), 18 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
index 8d5046923e9..18f937cecdb 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.status.api.v1.AccumulableInfo
 import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.util.Utils.weakIntern
 
 private[protobuf] object AccumulableInfoSerializer {
 
@@ -40,7 +41,7 @@ private[protobuf] object AccumulableInfoSerializer {
     updates.forEach { update =>
       accumulatorUpdates.append(new AccumulableInfo(
         id = update.getId,
-        name = update.getName,
+        name = weakIntern(update.getName),
         update = getOptional(update.hasUpdate, update.getUpdate),
         value = update.getValue))
     }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 4d9d045ed5e..2ffba9db755 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.ExecutorStageSummaryWrapper
+import org.apache.spark.util.Utils.weakIntern
 
 class ExecutorStageSummaryWrapperSerializer
   extends ProtobufSerDe[ExecutorStageSummaryWrapper] {
@@ -38,7 +39,7 @@ class ExecutorStageSummaryWrapperSerializer
     new ExecutorStageSummaryWrapper(
       stageId = binary.getStageId.toInt,
       stageAttemptId = binary.getStageAttemptId,
-      executorId = binary.getExecutorId,
+      executorId = weakIntern(binary.getExecutorId),
       info = info)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index b008c98e562..7092a78b00a 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -25,6 +25,7 @@ import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.status.ExecutorSummaryWrapper
 import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics}
 import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.util.Utils.weakIntern
 
 class ExecutorSummaryWrapperSerializer extends 
ProtobufSerDe[ExecutorSummaryWrapper] {
 
@@ -109,8 +110,8 @@ class ExecutorSummaryWrapperSerializer extends 
ProtobufSerDe[ExecutorSummaryWrap
       getOptional(binary.hasMemoryMetrics,
         () => deserializeMemoryMetrics(binary.getMemoryMetrics))
     new ExecutorSummary(
-      id = binary.getId,
-      hostPort = binary.getHostPort,
+      id = weakIntern(binary.getId),
+      hostPort = weakIntern(binary.getHostPort),
       isActive = binary.getIsActive,
       rddBlocks = binary.getRddBlocks,
       memoryUsed = binary.getMemoryUsed,
@@ -171,7 +172,7 @@ class ExecutorSummaryWrapperSerializer extends 
ProtobufSerDe[ExecutorSummaryWrap
   private def deserializeResourceInformation(binary: 
StoreTypes.ResourceInformation):
     ResourceInformation = {
     new ResourceInformation(
-      name = binary.getName,
-      addresses = binary.getAddressesList.asScala.toArray)
+      name = weakIntern(binary.getName),
+      addresses = binary.getAddressesList.asScala.map(weakIntern).toArray)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala 
b/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala
index a9c356ef13c..d1129d90339 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala
@@ -20,7 +20,6 @@ package org.apache.spark.status.protobuf
 import scala.collection.JavaConverters._
 
 import org.apache.spark.status.PoolData
-import org.apache.spark.util.Utils.weakIntern
 
 class PoolDataSerializer extends ProtobufSerDe[PoolData] {
 
@@ -34,7 +33,7 @@ class PoolDataSerializer extends ProtobufSerDe[PoolData] {
   override def deserialize(bytes: Array[Byte]): PoolData = {
     val poolData = StoreTypes.PoolData.parseFrom(bytes)
     new PoolData(
-      name = weakIntern(poolData.getName),
+      name = poolData.getName,
       stageIds = poolData.getStageIdsList.asScala.map(_.toInt).toSet
     )
   }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
index eda4422405e..dc72c3ed467 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -393,10 +393,8 @@ class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDataWrapper] {
       getOptional(binary.hasFirstTaskLaunchedTime, () => new 
Date(binary.getFirstTaskLaunchedTime))
     val completionTime =
       getOptional(binary.hasCompletionTime, () => new 
Date(binary.getCompletionTime))
-    val failureReason =
-      getOptional(binary.hasFailureReason, () => 
weakIntern(binary.getFailureReason))
-    val description =
-      getOptional(binary.hasDescription, () => 
weakIntern(binary.getDescription))
+    val failureReason = getOptional(binary.hasFailureReason, 
binary.getFailureReason)
+    val description = getOptional(binary.hasDescription, binary.getDescription)
     val accumulatorUpdates = 
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
     val tasks = if (MapUtils.isNotEmpty(binary.getTasksMap)) {
       Some(binary.getTasksMap.asScala.map(
@@ -467,9 +465,9 @@ class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDataWrapper] {
       shuffleWriteBytes = binary.getShuffleWriteBytes,
       shuffleWriteTime = binary.getShuffleWriteTime,
       shuffleWriteRecords = binary.getShuffleWriteRecords,
-      name = weakIntern(binary.getName),
+      name = binary.getName,
       description = description,
-      details = weakIntern(binary.getDetails),
+      details = binary.getDetails,
       schedulingPool = weakIntern(binary.getSchedulingPool),
       rddIds = binary.getRddIdsList.asScala.map(_.toInt),
       accumulatorUpdates = accumulatorUpdates,
@@ -644,7 +642,7 @@ class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDataWrapper] {
       taskLocality = weakIntern(binary.getTaskLocality),
       speculative = binary.getSpeculative,
       accumulatorUpdates = accumulatorUpdates,
-      errorMessage = getOptional(binary.hasErrorMessage, () => 
weakIntern(binary.getErrorMessage)),
+      errorMessage = getOptional(binary.hasErrorMessage, 
binary.getErrorMessage),
       taskMetrics = taskMetrics,
       executorLogs = binary.getExecutorLogsMap.asScala.toMap,
       schedulerDelay = binary.getSchedulerDelay,
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
index f450bbbfd0c..5dac03bb337 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.StreamBlockData
+import org.apache.spark.util.Utils.weakIntern
 
 class StreamBlockDataSerializer extends ProtobufSerDe[StreamBlockData] {
 
@@ -39,9 +40,9 @@ class StreamBlockDataSerializer extends 
ProtobufSerDe[StreamBlockData] {
     val binary = StoreTypes.StreamBlockData.parseFrom(bytes)
     new StreamBlockData(
       name = binary.getName,
-      executorId = binary.getExecutorId,
-      hostPort = binary.getHostPort,
-      storageLevel = binary.getStorageLevel,
+      executorId = weakIntern(binary.getExecutorId),
+      hostPort = weakIntern(binary.getHostPort),
+      storageLevel = weakIntern(binary.getStorageLevel),
       useMemory = binary.getUseMemory,
       useDisk = binary.getUseDisk,
       deserialized = binary.getDeserialized,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to