Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160336237
  
    --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
    @@ -69,51 +71,271 @@ private[spark] class JobDataWrapper(
         val skippedStages: Set[Int]) {
     
       @JsonIgnore @KVIndex
    -  private[this] val id: Int = info.jobId
    +  private def id: Int = info.jobId
     
     }
     
     private[spark] class StageDataWrapper(
         val info: StageData,
    -    val jobIds: Set[Int]) {
    +    val jobIds: Set[Int],
    +    @JsonDeserialize(contentAs = classOf[JLong])
    +    val locality: Map[String, Long]) {
     
       @JsonIgnore @KVIndex
    -  def id: Array[Int] = Array(info.stageId, info.attemptId)
    +  private[this] val id: Array[Int] = Array(info.stageId, info.attemptId)
     
       @JsonIgnore @KVIndex("stageId")
    -  def stageId: Int = info.stageId
    +  private def stageId: Int = info.stageId
     
    +  @JsonIgnore @KVIndex("active")
    +  private def active: Boolean = info.status == StageStatus.ACTIVE
    +
    +}
    +
    +/**
    + * Tasks have a lot of indices that are used in a few different places. 
This object keeps logical
    + * names for these indices, mapped to short strings to save space when 
using a disk store.
    + */
    +private[spark] object TaskIndexNames {
    +  final val ACCUMULATORS = "acc"
    +  final val ATTEMPT = "att"
    +  final val DESER_CPU_TIME = "dct"
    +  final val DESER_TIME = "des"
    +  final val DISK_SPILL = "dbs"
    +  final val DURATION = "dur"
    +  final val ERROR = "err"
    +  final val EXECUTOR = "exe"
    +  final val EXEC_CPU_TIME = "ect"
    +  final val EXEC_RUN_TIME = "ert"
    +  final val GC_TIME = "gc"
    +  final val GETTING_RESULT_TIME = "grt"
    +  final val INPUT_RECORDS = "ir"
    +  final val INPUT_SIZE = "is"
    +  final val LAUNCH_TIME = "lt"
    +  final val LOCALITY = "loc"
    +  final val MEM_SPILL = "mbs"
    +  final val OUTPUT_RECORDS = "or"
    +  final val OUTPUT_SIZE = "os"
    +  final val PEAK_MEM = "pem"
    +  final val RESULT_SIZE = "rs"
    +  final val SCHEDULER_DELAY = "dly"
    +  final val SER_TIME = "rst"
    +  final val SHUFFLE_LOCAL_BLOCKS = "slbl"
    +  final val SHUFFLE_READ_RECORDS = "srr"
    +  final val SHUFFLE_READ_TIME = "srt"
    +  final val SHUFFLE_REMOTE_BLOCKS = "srbl"
    +  final val SHUFFLE_REMOTE_READS = "srby"
    +  final val SHUFFLE_REMOTE_READS_TO_DISK = "srbd"
    +  final val SHUFFLE_TOTAL_READS = "stby"
    +  final val SHUFFLE_TOTAL_BLOCKS = "stbl"
    +  final val SHUFFLE_WRITE_RECORDS = "swr"
    +  final val SHUFFLE_WRITE_SIZE = "sws"
    +  final val SHUFFLE_WRITE_TIME = "swt"
    +  final val STAGE = "stage"
    +  final val STATUS = "sta"
    +  final val TASK_INDEX = "idx"
     }
     
     /**
    - * The task information is always indexed with the stage ID, since that is 
how the UI and API
    - * consume it. That means every indexed value has the stage ID and attempt 
ID included, aside
    - * from the actual data being indexed.
    + * Unlike other data types, the task data wrapper does not keep a 
reference to the API's TaskData.
    + * That is to save memory, since for large applications there can be a 
large number of these
    + * elements (by default up to 100,000 per stage), and every bit of wasted 
memory adds up.
    + *
    + * It also contains many secondary indices, which are used to sort data 
efficiently in the UI at the
    + * expense of storage space (and slower write times).
      */
     private[spark] class TaskDataWrapper(
    -    val info: TaskData,
    +    // Storing this as an object actually saves memory; it's also used as 
the key in the in-memory
    +    // store, so in that case you'd save the extra copy of the value here.
    +    @KVIndexParam
    +    val taskId: JLong,
    +    @KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = 
TaskIndexNames.STAGE)
    +    val index: Int,
    +    @KVIndexParam(value = TaskIndexNames.ATTEMPT, parent = 
TaskIndexNames.STAGE)
    --- End diff --
    
    do we really need to create index for every field?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to