This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 878ac2fe488 [SPARK-41759][CORE] Use `weakIntern` on string values in create new objects during deserialization 878ac2fe488 is described below commit 878ac2fe4880472f03e6d9907a68fde30479b281 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Thu Jan 19 13:53:34 2023 -0800 [SPARK-41759][CORE] Use `weakIntern` on string values in create new objects during deserialization ### What changes were proposed in this pull request? The pr aims to use weakIntern on string values in create new objects during deserialization. ### Why are the changes needed? Following guid: https://github.com/apache/spark/pull/39270. <img width="587" alt="image" src="https://user-images.githubusercontent.com/15246973/209924939-779db183-f0a6-4f3b-aebf-cf00b6c95cf8.png"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #39275 from panbingkun/SPARK-41759. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../spark/status/protobuf/AccumulableInfoSerializer.scala | 3 ++- .../protobuf/ExecutorStageSummaryWrapperSerializer.scala | 3 ++- .../status/protobuf/ExecutorSummaryWrapperSerializer.scala | 9 +++++---- .../apache/spark/status/protobuf/PoolDataSerializer.scala | 3 +-- .../spark/status/protobuf/StageDataWrapperSerializer.scala | 12 +++++------- .../spark/status/protobuf/StreamBlockDataSerializer.scala | 7 ++++--- 6 files changed, 19 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala index 8d5046923e9..18f937cecdb 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.status.api.v1.AccumulableInfo import org.apache.spark.status.protobuf.Utils.getOptional +import org.apache.spark.util.Utils.weakIntern private[protobuf] object AccumulableInfoSerializer { @@ -40,7 +41,7 @@ private[protobuf] object AccumulableInfoSerializer { updates.forEach { update => accumulatorUpdates.append(new AccumulableInfo( id = update.getId, - name = update.getName, + name = weakIntern(update.getName), update = getOptional(update.hasUpdate, update.getUpdate), value = update.getValue)) } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala index 4d9d045ed5e..2ffba9db755 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala @@ -18,6 +18,7 @@ package org.apache.spark.status.protobuf import org.apache.spark.status.ExecutorStageSummaryWrapper +import org.apache.spark.util.Utils.weakIntern class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe[ExecutorStageSummaryWrapper] { @@ -38,7 +39,7 @@ class ExecutorStageSummaryWrapperSerializer new ExecutorStageSummaryWrapper( stageId = binary.getStageId.toInt, stageAttemptId = binary.getStageAttemptId, - executorId = binary.getExecutorId, + executorId = weakIntern(binary.getExecutorId), info = info) } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala index b008c98e562..7092a78b00a 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala @@ -25,6 +25,7 @@ import org.apache.spark.resource.ResourceInformation import org.apache.spark.status.ExecutorSummaryWrapper import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics} import org.apache.spark.status.protobuf.Utils.getOptional +import org.apache.spark.util.Utils.weakIntern class ExecutorSummaryWrapperSerializer extends ProtobufSerDe[ExecutorSummaryWrapper] { @@ -109,8 +110,8 @@ class ExecutorSummaryWrapperSerializer extends ProtobufSerDe[ExecutorSummaryWrap getOptional(binary.hasMemoryMetrics, () => deserializeMemoryMetrics(binary.getMemoryMetrics)) new ExecutorSummary( - id = binary.getId, - hostPort = binary.getHostPort, + id = weakIntern(binary.getId), + hostPort = weakIntern(binary.getHostPort), isActive = binary.getIsActive, rddBlocks = binary.getRddBlocks, memoryUsed = binary.getMemoryUsed, @@ -171,7 +172,7 @@ class ExecutorSummaryWrapperSerializer extends ProtobufSerDe[ExecutorSummaryWrap private def deserializeResourceInformation(binary: StoreTypes.ResourceInformation): ResourceInformation = { new ResourceInformation( - name = binary.getName, - addresses = binary.getAddressesList.asScala.toArray) + name = weakIntern(binary.getName), + addresses = binary.getAddressesList.asScala.map(weakIntern).toArray) } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala index a9c356ef13c..d1129d90339 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala @@ -20,7 +20,6 @@ package org.apache.spark.status.protobuf import scala.collection.JavaConverters._ import org.apache.spark.status.PoolData -import org.apache.spark.util.Utils.weakIntern class PoolDataSerializer extends ProtobufSerDe[PoolData] { @@ -34,7 +33,7 @@ class PoolDataSerializer extends ProtobufSerDe[PoolData] { override def deserialize(bytes: Array[Byte]): PoolData = { val poolData = StoreTypes.PoolData.parseFrom(bytes) new PoolData( - name = weakIntern(poolData.getName), + name = poolData.getName, stageIds = poolData.getStageIdsList.asScala.map(_.toInt).toSet ) } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala index eda4422405e..dc72c3ed467 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala @@ -393,10 +393,8 @@ class StageDataWrapperSerializer extends ProtobufSerDe[StageDataWrapper] { getOptional(binary.hasFirstTaskLaunchedTime, () => new Date(binary.getFirstTaskLaunchedTime)) val completionTime = getOptional(binary.hasCompletionTime, () => new Date(binary.getCompletionTime)) - val failureReason = - getOptional(binary.hasFailureReason, () => weakIntern(binary.getFailureReason)) - val description = - getOptional(binary.hasDescription, () => weakIntern(binary.getDescription)) + val failureReason = getOptional(binary.hasFailureReason, binary.getFailureReason) + val description = getOptional(binary.hasDescription, binary.getDescription) val accumulatorUpdates = AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList) val tasks = if (MapUtils.isNotEmpty(binary.getTasksMap)) { Some(binary.getTasksMap.asScala.map( @@ -467,9 +465,9 @@ class StageDataWrapperSerializer extends ProtobufSerDe[StageDataWrapper] { shuffleWriteBytes = binary.getShuffleWriteBytes, shuffleWriteTime = binary.getShuffleWriteTime, shuffleWriteRecords = binary.getShuffleWriteRecords, - name = weakIntern(binary.getName), + name = binary.getName, description = description, - details = weakIntern(binary.getDetails), + details = binary.getDetails, schedulingPool = weakIntern(binary.getSchedulingPool), rddIds = binary.getRddIdsList.asScala.map(_.toInt), accumulatorUpdates = accumulatorUpdates, @@ -644,7 +642,7 @@ class StageDataWrapperSerializer extends ProtobufSerDe[StageDataWrapper] { taskLocality = weakIntern(binary.getTaskLocality), speculative = binary.getSpeculative, accumulatorUpdates = accumulatorUpdates, - errorMessage = getOptional(binary.hasErrorMessage, () => weakIntern(binary.getErrorMessage)), + errorMessage = getOptional(binary.hasErrorMessage, binary.getErrorMessage), taskMetrics = taskMetrics, executorLogs = binary.getExecutorLogsMap.asScala.toMap, schedulerDelay = binary.getSchedulerDelay, diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala index f450bbbfd0c..5dac03bb337 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala @@ -18,6 +18,7 @@ package org.apache.spark.status.protobuf import org.apache.spark.status.StreamBlockData +import org.apache.spark.util.Utils.weakIntern class StreamBlockDataSerializer extends ProtobufSerDe[StreamBlockData] { @@ -39,9 +40,9 @@ class StreamBlockDataSerializer extends ProtobufSerDe[StreamBlockData] { val binary = StoreTypes.StreamBlockData.parseFrom(bytes) new StreamBlockData( name = binary.getName, - executorId = binary.getExecutorId, - hostPort = binary.getHostPort, - storageLevel = binary.getStorageLevel, + executorId = weakIntern(binary.getExecutorId), + hostPort = weakIntern(binary.getHostPort), + storageLevel = weakIntern(binary.getStorageLevel), useMemory = binary.getUseMemory, useDisk = binary.getUseDisk, deserialized = binary.getDeserialized, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org