This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 49d8ca87fb0 [SPARK-41680][UI] Protobuf serializer for CachedQuantile 49d8ca87fb0 is described below commit 49d8ca87fb03b0651cee80791b8c7de96286d433 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Thu Dec 22 14:18:54 2022 -0800 [SPARK-41680][UI] Protobuf serializer for CachedQuantile ### What changes were proposed in this pull request? Add Protobuf serializer for CachedQuantile ### Why are the changes needed? Support fast and compact serialization/deserialization for CachedQuantile over RocksDB. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UT Closes #39175 from gengliangwang/CachedQuantile. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 35 ++++++++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../status/protobuf/CachedQuantileSerializer.scala | 99 ++++++++++++++++++++++ .../protobuf/KVStoreProtobufSerializerSuite.scala | 70 +++++++++++++++ 4 files changed, 205 insertions(+) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index e04899f3d9f..5949ad63c84 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -257,6 +257,41 @@ message ResourceProfileWrapper { ResourceProfileInfo rpInfo = 1; } +message CachedQuantile { + int64 stage_id = 1; + int32 stage_attempt_id = 2; + string quantile = 3; + int64 task_count = 4; + double duration = 5; + double executor_deserialize_time = 6; + double executor_deserialize_cpu_time = 7; + double executor_run_time = 8; + double executor_cpu_time = 9; + double result_size = 10; + double jvm_gc_time = 11; + double result_serialization_time = 12; + double getting_result_time = 13; + double scheduler_delay = 14; + double peak_execution_memory = 15; + double memory_bytes_spilled = 16; + double disk_bytes_spilled = 17; + double bytes_read = 18; + double records_read = 19; + double bytes_written = 20; + double records_written = 21; + double shuffle_read_bytes = 22; + double shuffle_records_read = 23; + double shuffle_remote_blocks_fetched = 24; + double shuffle_local_blocks_fetched = 25; + double shuffle_fetch_wait_time = 26; + double shuffle_remote_bytes_read = 27; + double shuffle_remote_bytes_read_to_disk = 28; + double shuffle_total_blocks_fetched = 29; + double shuffle_write_bytes = 30; + double shuffle_write_records = 31; + double shuffle_write_time = 32; +} + message SpeculationStageSummary { int32 num_tasks = 1; int32 num_active_tasks = 2; diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 92f15211613..97c186206bf 100644 --- a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -18,6 +18,7 @@ org.apache.spark.status.protobuf.RDDStorageInfoWrapperSerializer org.apache.spark.status.protobuf.ApplicationInfoWrapperSerializer org.apache.spark.status.protobuf.ApplicationEnvironmentInfoWrapperSerializer +org.apache.spark.status.protobuf.CachedQuantileSerializer org.apache.spark.status.protobuf.ExecutorStageSummaryWrapperSerializer org.apache.spark.status.protobuf.StreamBlockDataSerializer org.apache.spark.status.protobuf.TaskDataWrapperSerializer diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala new file mode 100644 index 00000000000..9aaa5ee2a1a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import org.apache.spark.status.CachedQuantile + +class CachedQuantileSerializer extends ProtobufSerDe { + override val supportClass: Class[_] = classOf[CachedQuantile] + + override def serialize(input: Any): Array[Byte] = { + val data = input.asInstanceOf[CachedQuantile] + val builder = StoreTypes.CachedQuantile.newBuilder() + .setStageId(data.stageId.toLong) + .setStageAttemptId(data.stageAttemptId) + .setQuantile(data.quantile) + .setTaskCount(data.taskCount) + .setDuration(data.duration) + .setExecutorDeserializeTime(data.executorDeserializeTime) + .setExecutorDeserializeCpuTime(data.executorDeserializeCpuTime) + .setExecutorRunTime(data.executorRunTime) + .setExecutorCpuTime(data.executorCpuTime) + .setResultSize(data.resultSize) + .setJvmGcTime(data.jvmGcTime) + .setResultSerializationTime(data.resultSerializationTime) + .setGettingResultTime(data.gettingResultTime) + .setSchedulerDelay(data.schedulerDelay) + .setPeakExecutionMemory(data.peakExecutionMemory) + .setMemoryBytesSpilled(data.memoryBytesSpilled) + .setDiskBytesSpilled(data.diskBytesSpilled) + .setBytesRead(data.bytesRead) + .setRecordsRead(data.recordsRead) + .setBytesWritten(data.bytesWritten) + .setRecordsWritten(data.recordsWritten) + .setShuffleReadBytes(data.shuffleReadBytes) + .setShuffleRecordsRead(data.shuffleRecordsRead) + .setShuffleRemoteBlocksFetched(data.shuffleRemoteBlocksFetched) + .setShuffleLocalBlocksFetched(data.shuffleLocalBlocksFetched) + .setShuffleFetchWaitTime(data.shuffleFetchWaitTime) + .setShuffleRemoteBytesRead(data.shuffleRemoteBytesRead) + .setShuffleRemoteBytesReadToDisk(data.shuffleRemoteBytesReadToDisk) + .setShuffleTotalBlocksFetched(data.shuffleTotalBlocksFetched) + .setShuffleWriteBytes(data.shuffleWriteBytes) + .setShuffleWriteRecords(data.shuffleWriteRecords) + .setShuffleWriteTime(data.shuffleWriteTime) + builder.build().toByteArray + } + + override def deserialize(bytes: Array[Byte]): CachedQuantile = { + val binary = StoreTypes.CachedQuantile.parseFrom(bytes) + new CachedQuantile( + stageId = binary.getStageId.toInt, + stageAttemptId = binary.getStageAttemptId, + quantile = binary.getQuantile, + taskCount = binary.getTaskCount, + duration = binary.getDuration, + executorDeserializeTime = binary.getExecutorDeserializeTime, + executorDeserializeCpuTime = binary.getExecutorDeserializeCpuTime, + executorRunTime = binary.getExecutorRunTime, + executorCpuTime = binary.getExecutorCpuTime, + resultSize = binary.getResultSize, + jvmGcTime = binary.getJvmGcTime, + resultSerializationTime = binary.getResultSerializationTime, + gettingResultTime = binary.getGettingResultTime, + schedulerDelay = binary.getSchedulerDelay, + peakExecutionMemory = binary.getPeakExecutionMemory, + memoryBytesSpilled = binary.getMemoryBytesSpilled, + diskBytesSpilled = binary.getDiskBytesSpilled, + bytesRead = binary.getBytesRead, + recordsRead = binary.getRecordsRead, + bytesWritten = binary.getBytesWritten, + recordsWritten = binary.getRecordsWritten, + shuffleReadBytes = binary.getShuffleReadBytes, + shuffleRecordsRead = binary.getShuffleRecordsRead, + shuffleRemoteBlocksFetched = binary.getShuffleRemoteBlocksFetched, + shuffleLocalBlocksFetched = binary.getShuffleLocalBlocksFetched, + shuffleFetchWaitTime = binary.getShuffleFetchWaitTime, + shuffleRemoteBytesRead = binary.getShuffleRemoteBytesRead, + shuffleRemoteBytesReadToDisk = binary.getShuffleRemoteBytesReadToDisk, + shuffleTotalBlocksFetched = binary.getShuffleTotalBlocksFetched, + shuffleWriteBytes = binary.getShuffleWriteBytes, + shuffleWriteRecords = binary.getShuffleWriteRecords, + shuffleWriteTime = binary.getShuffleWriteTime) + } +} diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index 713c84f54d2..14f615587b1 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -525,6 +525,76 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { } } + test("CachedQuantile") { + val input = new CachedQuantile( + stageId = 1, + stageAttemptId = 2, + quantile = "a", + taskCount = 3L, + duration = 4L, + executorDeserializeTime = 5.1, + executorDeserializeCpuTime = 6.1, + executorRunTime = 7.1, + executorCpuTime = 8.1, + resultSize = 9.1, + jvmGcTime = 10.1, + resultSerializationTime = 11.1, + gettingResultTime = 12.1, + schedulerDelay = 13.1, + peakExecutionMemory = 14.1, + memoryBytesSpilled = 15.1, + diskBytesSpilled = 16.1, + bytesRead = 17.1, + recordsRead = 18.1, + bytesWritten = 19.1, + recordsWritten = 20.1, + shuffleReadBytes = 21.1, + shuffleRecordsRead = 22.1, + shuffleRemoteBlocksFetched = 23.1, + shuffleLocalBlocksFetched = 24.1, + shuffleFetchWaitTime = 25.1, + shuffleRemoteBytesRead = 26.1, + shuffleRemoteBytesReadToDisk = 27.1, + shuffleTotalBlocksFetched = 28.1, + shuffleWriteBytes = 29.1, + shuffleWriteRecords = 30.1, + shuffleWriteTime = 31.1) + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[CachedQuantile]) + assert(result.stageId == input.stageId) + assert(result.stageAttemptId == input.stageAttemptId) + assert(result.quantile == input.quantile) + assert(result.taskCount == input.taskCount) + assert(result.duration == input.duration) + assert(result.executorDeserializeTime == input.executorDeserializeTime) + assert(result.executorDeserializeCpuTime == input.executorDeserializeCpuTime) + assert(result.executorRunTime == input.executorRunTime) + assert(result.executorCpuTime == input.executorCpuTime) + assert(result.resultSize == input.resultSize) + assert(result.jvmGcTime == input.jvmGcTime) + assert(result.resultSerializationTime == input.resultSerializationTime) + assert(result.gettingResultTime == input.gettingResultTime) + assert(result.schedulerDelay == input.schedulerDelay) + assert(result.peakExecutionMemory == input.peakExecutionMemory) + assert(result.memoryBytesSpilled == input.memoryBytesSpilled) + assert(result.diskBytesSpilled == input.diskBytesSpilled) + assert(result.bytesRead == input.bytesRead) + assert(result.recordsRead == input.recordsRead) + assert(result.bytesWritten == input.bytesWritten) + assert(result.recordsWritten == input.recordsWritten) + assert(result.shuffleReadBytes == input.shuffleReadBytes) + assert(result.shuffleRecordsRead == input.shuffleRecordsRead) + assert(result.shuffleRemoteBlocksFetched == input.shuffleRemoteBlocksFetched) + assert(result.shuffleLocalBlocksFetched == input.shuffleLocalBlocksFetched) + assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime) + assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead) + assert(result.shuffleRemoteBytesReadToDisk == input.shuffleRemoteBytesReadToDisk) + assert(result.shuffleTotalBlocksFetched == input.shuffleTotalBlocksFetched) + assert(result.shuffleWriteBytes == input.shuffleWriteBytes) + assert(result.shuffleWriteRecords == input.shuffleWriteRecords) + assert(result.shuffleWriteTime == input.shuffleWriteTime) + } + test("Speculation Stage Summary") { val input = new SpeculationStageSummaryWrapper( stageId = 1, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org