This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d8ca87fb0 [SPARK-41680][UI] Protobuf serializer for CachedQuantile
49d8ca87fb0 is described below

commit 49d8ca87fb03b0651cee80791b8c7de96286d433
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Thu Dec 22 14:18:54 2022 -0800

    [SPARK-41680][UI] Protobuf serializer for CachedQuantile
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for CachedQuantile
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for CachedQuantile 
over RocksDB.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UT
    
    Closes #39175 from gengliangwang/CachedQuantile.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto | 35 ++++++++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |  1 +
 .../status/protobuf/CachedQuantileSerializer.scala | 99 ++++++++++++++++++++++
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 70 +++++++++++++++
 4 files changed, 205 insertions(+)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index e04899f3d9f..5949ad63c84 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -257,6 +257,41 @@ message ResourceProfileWrapper {
   ResourceProfileInfo rpInfo = 1;
 }
 
+message CachedQuantile {
+  int64 stage_id = 1;
+  int32 stage_attempt_id = 2;
+  string quantile = 3;
+  int64 task_count = 4;
+  double duration = 5;
+  double executor_deserialize_time = 6;
+  double executor_deserialize_cpu_time = 7;
+  double executor_run_time = 8;
+  double executor_cpu_time = 9;
+  double result_size = 10;
+  double jvm_gc_time = 11;
+  double result_serialization_time = 12;
+  double getting_result_time = 13;
+  double scheduler_delay = 14;
+  double peak_execution_memory = 15;
+  double memory_bytes_spilled = 16;
+  double disk_bytes_spilled = 17;
+  double bytes_read = 18;
+  double records_read = 19;
+  double bytes_written = 20;
+  double records_written = 21;
+  double shuffle_read_bytes = 22;
+  double shuffle_records_read = 23;
+  double shuffle_remote_blocks_fetched = 24;
+  double shuffle_local_blocks_fetched = 25;
+  double shuffle_fetch_wait_time = 26;
+  double shuffle_remote_bytes_read = 27;
+  double shuffle_remote_bytes_read_to_disk = 28;
+  double shuffle_total_blocks_fetched = 29;
+  double shuffle_write_bytes = 30;
+  double shuffle_write_records = 31;
+  double shuffle_write_time = 32;
+}
+
 message SpeculationStageSummary {
   int32 num_tasks = 1;
   int32 num_active_tasks = 2;
diff --git 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 92f15211613..97c186206bf 100644
--- 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -18,6 +18,7 @@
 org.apache.spark.status.protobuf.RDDStorageInfoWrapperSerializer
 org.apache.spark.status.protobuf.ApplicationInfoWrapperSerializer
 org.apache.spark.status.protobuf.ApplicationEnvironmentInfoWrapperSerializer
+org.apache.spark.status.protobuf.CachedQuantileSerializer
 org.apache.spark.status.protobuf.ExecutorStageSummaryWrapperSerializer
 org.apache.spark.status.protobuf.StreamBlockDataSerializer
 org.apache.spark.status.protobuf.TaskDataWrapperSerializer
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
new file mode 100644
index 00000000000..9aaa5ee2a1a
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.spark.status.CachedQuantile
+
+class CachedQuantileSerializer extends ProtobufSerDe {
+  override val supportClass: Class[_] = classOf[CachedQuantile]
+
+  override def serialize(input: Any): Array[Byte] = {
+    val data = input.asInstanceOf[CachedQuantile]
+    val builder = StoreTypes.CachedQuantile.newBuilder()
+      .setStageId(data.stageId.toLong)
+      .setStageAttemptId(data.stageAttemptId)
+      .setQuantile(data.quantile)
+      .setTaskCount(data.taskCount)
+      .setDuration(data.duration)
+      .setExecutorDeserializeTime(data.executorDeserializeTime)
+      .setExecutorDeserializeCpuTime(data.executorDeserializeCpuTime)
+      .setExecutorRunTime(data.executorRunTime)
+      .setExecutorCpuTime(data.executorCpuTime)
+      .setResultSize(data.resultSize)
+      .setJvmGcTime(data.jvmGcTime)
+      .setResultSerializationTime(data.resultSerializationTime)
+      .setGettingResultTime(data.gettingResultTime)
+      .setSchedulerDelay(data.schedulerDelay)
+      .setPeakExecutionMemory(data.peakExecutionMemory)
+      .setMemoryBytesSpilled(data.memoryBytesSpilled)
+      .setDiskBytesSpilled(data.diskBytesSpilled)
+      .setBytesRead(data.bytesRead)
+      .setRecordsRead(data.recordsRead)
+      .setBytesWritten(data.bytesWritten)
+      .setRecordsWritten(data.recordsWritten)
+      .setShuffleReadBytes(data.shuffleReadBytes)
+      .setShuffleRecordsRead(data.shuffleRecordsRead)
+      .setShuffleRemoteBlocksFetched(data.shuffleRemoteBlocksFetched)
+      .setShuffleLocalBlocksFetched(data.shuffleLocalBlocksFetched)
+      .setShuffleFetchWaitTime(data.shuffleFetchWaitTime)
+      .setShuffleRemoteBytesRead(data.shuffleRemoteBytesRead)
+      .setShuffleRemoteBytesReadToDisk(data.shuffleRemoteBytesReadToDisk)
+      .setShuffleTotalBlocksFetched(data.shuffleTotalBlocksFetched)
+      .setShuffleWriteBytes(data.shuffleWriteBytes)
+      .setShuffleWriteRecords(data.shuffleWriteRecords)
+      .setShuffleWriteTime(data.shuffleWriteTime)
+    builder.build().toByteArray
+  }
+
+  override def deserialize(bytes: Array[Byte]): CachedQuantile = {
+    val binary = StoreTypes.CachedQuantile.parseFrom(bytes)
+    new CachedQuantile(
+      stageId = binary.getStageId.toInt,
+      stageAttemptId = binary.getStageAttemptId,
+      quantile = binary.getQuantile,
+      taskCount = binary.getTaskCount,
+      duration = binary.getDuration,
+      executorDeserializeTime = binary.getExecutorDeserializeTime,
+      executorDeserializeCpuTime = binary.getExecutorDeserializeCpuTime,
+      executorRunTime = binary.getExecutorRunTime,
+      executorCpuTime = binary.getExecutorCpuTime,
+      resultSize = binary.getResultSize,
+      jvmGcTime = binary.getJvmGcTime,
+      resultSerializationTime = binary.getResultSerializationTime,
+      gettingResultTime = binary.getGettingResultTime,
+      schedulerDelay = binary.getSchedulerDelay,
+      peakExecutionMemory = binary.getPeakExecutionMemory,
+      memoryBytesSpilled = binary.getMemoryBytesSpilled,
+      diskBytesSpilled = binary.getDiskBytesSpilled,
+      bytesRead = binary.getBytesRead,
+      recordsRead = binary.getRecordsRead,
+      bytesWritten = binary.getBytesWritten,
+      recordsWritten = binary.getRecordsWritten,
+      shuffleReadBytes = binary.getShuffleReadBytes,
+      shuffleRecordsRead = binary.getShuffleRecordsRead,
+      shuffleRemoteBlocksFetched = binary.getShuffleRemoteBlocksFetched,
+      shuffleLocalBlocksFetched = binary.getShuffleLocalBlocksFetched,
+      shuffleFetchWaitTime = binary.getShuffleFetchWaitTime,
+      shuffleRemoteBytesRead = binary.getShuffleRemoteBytesRead,
+      shuffleRemoteBytesReadToDisk = binary.getShuffleRemoteBytesReadToDisk,
+      shuffleTotalBlocksFetched = binary.getShuffleTotalBlocksFetched,
+      shuffleWriteBytes = binary.getShuffleWriteBytes,
+      shuffleWriteRecords = binary.getShuffleWriteRecords,
+      shuffleWriteTime = binary.getShuffleWriteTime)
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 713c84f54d2..14f615587b1 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -525,6 +525,76 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
     }
   }
 
+  test("CachedQuantile") {
+    val input = new CachedQuantile(
+      stageId = 1,
+      stageAttemptId = 2,
+      quantile = "a",
+      taskCount = 3L,
+      duration = 4L,
+      executorDeserializeTime = 5.1,
+      executorDeserializeCpuTime = 6.1,
+      executorRunTime = 7.1,
+      executorCpuTime = 8.1,
+      resultSize = 9.1,
+      jvmGcTime = 10.1,
+      resultSerializationTime = 11.1,
+      gettingResultTime = 12.1,
+      schedulerDelay = 13.1,
+      peakExecutionMemory = 14.1,
+      memoryBytesSpilled = 15.1,
+      diskBytesSpilled = 16.1,
+      bytesRead = 17.1,
+      recordsRead = 18.1,
+      bytesWritten = 19.1,
+      recordsWritten = 20.1,
+      shuffleReadBytes = 21.1,
+      shuffleRecordsRead = 22.1,
+      shuffleRemoteBlocksFetched = 23.1,
+      shuffleLocalBlocksFetched = 24.1,
+      shuffleFetchWaitTime = 25.1,
+      shuffleRemoteBytesRead = 26.1,
+      shuffleRemoteBytesReadToDisk = 27.1,
+      shuffleTotalBlocksFetched = 28.1,
+      shuffleWriteBytes = 29.1,
+      shuffleWriteRecords = 30.1,
+      shuffleWriteTime = 31.1)
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, classOf[CachedQuantile])
+    assert(result.stageId == input.stageId)
+    assert(result.stageAttemptId == input.stageAttemptId)
+    assert(result.quantile == input.quantile)
+    assert(result.taskCount == input.taskCount)
+    assert(result.duration == input.duration)
+    assert(result.executorDeserializeTime == input.executorDeserializeTime)
+    assert(result.executorDeserializeCpuTime == 
input.executorDeserializeCpuTime)
+    assert(result.executorRunTime == input.executorRunTime)
+    assert(result.executorCpuTime == input.executorCpuTime)
+    assert(result.resultSize == input.resultSize)
+    assert(result.jvmGcTime == input.jvmGcTime)
+    assert(result.resultSerializationTime == input.resultSerializationTime)
+    assert(result.gettingResultTime == input.gettingResultTime)
+    assert(result.schedulerDelay == input.schedulerDelay)
+    assert(result.peakExecutionMemory == input.peakExecutionMemory)
+    assert(result.memoryBytesSpilled == input.memoryBytesSpilled)
+    assert(result.diskBytesSpilled == input.diskBytesSpilled)
+    assert(result.bytesRead == input.bytesRead)
+    assert(result.recordsRead == input.recordsRead)
+    assert(result.bytesWritten == input.bytesWritten)
+    assert(result.recordsWritten == input.recordsWritten)
+    assert(result.shuffleReadBytes == input.shuffleReadBytes)
+    assert(result.shuffleRecordsRead == input.shuffleRecordsRead)
+    assert(result.shuffleRemoteBlocksFetched == 
input.shuffleRemoteBlocksFetched)
+    assert(result.shuffleLocalBlocksFetched == input.shuffleLocalBlocksFetched)
+    assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime)
+    assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead)
+    assert(result.shuffleRemoteBytesReadToDisk == 
input.shuffleRemoteBytesReadToDisk)
+    assert(result.shuffleTotalBlocksFetched == input.shuffleTotalBlocksFetched)
+    assert(result.shuffleWriteBytes == input.shuffleWriteBytes)
+    assert(result.shuffleWriteRecords == input.shuffleWriteRecords)
+    assert(result.shuffleWriteTime == input.shuffleWriteTime)
+  }
+
   test("Speculation Stage Summary") {
     val input = new SpeculationStageSummaryWrapper(
       stageId = 1,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to