Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10835#discussion_r50742726
  
    --- Diff: 
core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala ---
    @@ -17,12 +17,345 @@
     
     package org.apache.spark.executor
     
    -import org.apache.spark.SparkFunSuite
    +import org.apache.spark._
    +import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, 
TestBlockId}
    +
     
     class TaskMetricsSuite extends SparkFunSuite {
    -  test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not 
added when no shuffle deps") {
    -    val taskMetrics = new TaskMetrics()
    -    taskMetrics.mergeShuffleReadMetrics()
    -    assert(taskMetrics.shuffleReadMetrics.isEmpty)
    +  import AccumulatorParam._
    +  import InternalAccumulator._
    +  import StorageLevel._
    +  import TaskMetricsSuite._
    +
    +  test("create") {
    +    val internalAccums = InternalAccumulator.create()
    +    val tm1 = new TaskMetrics
    +    val tm2 = new TaskMetrics(internalAccums)
    +    assert(tm1.accumulatorUpdates().size === internalAccums.size)
    +    assert(tm2.accumulatorUpdates().size === internalAccums.size)
    +    val unnamedAccum = new Accumulator(0, IntAccumulatorParam, None, 
internal = true)
    +    val dupNamedAccum = new Accumulator(0, IntAccumulatorParam, 
Some(RESULT_SIZE), internal = true)
    +    val externalAccum = new Accumulator(0, IntAccumulatorParam, Some("x"))
    +    val internalAccums2 = internalAccums ++ Seq(unnamedAccum)
    +    val internalAccums3 = internalAccums ++ Seq(dupNamedAccum)
    +    val internalAccums4 = internalAccums ++ Seq(externalAccum)
    +    // TaskMetrics constructor expects minimal set of initial accumulators
    +    intercept[AssertionError] { new TaskMetrics(Seq.empty[Accumulator[_]]) 
}
    +    // initial accums must be named
    +    intercept[AssertionError] { new TaskMetrics(internalAccums2) }
    +    // initial accums must not have duplicate names
    +    intercept[AssertionError] { new TaskMetrics(internalAccums3) }
    +    // initial accums must be internal
    +    intercept[AssertionError] { new TaskMetrics(internalAccums4) }
    +  }
    +
    +  test("mutating values") {
    +    val accums = InternalAccumulator.create()
    +    val tm = new TaskMetrics(accums)
    +    // initial values
    +    assertValueEquals(tm, _.executorDeserializeTime, accums, 
EXECUTOR_DESERIALIZE_TIME, 0L)
    +    assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 0L)
    +    assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 0L)
    +    assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 0L)
    +    assertValueEquals(tm, _.resultSerializationTime, accums, 
RESULT_SERIALIZATION_TIME, 0L)
    +    assertValueEquals(tm, _.memoryBytesSpilled, accums, 
MEMORY_BYTES_SPILLED, 0L)
    +    assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 
0L)
    +    assertValueEquals(tm, _.peakExecutionMemory, accums, 
PEAK_EXECUTION_MEMORY, 0L)
    +    assertValueEquals(tm, _.updatedBlockStatuses, accums, 
UPDATED_BLOCK_STATUSES,
    +      Seq.empty[(BlockId, BlockStatus)])
    +    // set or increment values
    +    tm.setExecutorDeserializeTime(100L)
    +    tm.setExecutorDeserializeTime(1L) // overwrite
    +    tm.setExecutorRunTime(200L)
    +    tm.setExecutorRunTime(2L)
    +    tm.setResultSize(300L)
    +    tm.setResultSize(3L)
    +    tm.setJvmGCTime(400L)
    +    tm.setJvmGCTime(4L)
    +    tm.setResultSerializationTime(500L)
    +    tm.setResultSerializationTime(5L)
    +    tm.incMemoryBytesSpilled(600L)
    +    tm.incMemoryBytesSpilled(6L) // add
    +    tm.incDiskBytesSpilled(700L)
    +    tm.incDiskBytesSpilled(7L)
    +    tm.incPeakExecutionMemory(800L)
    +    tm.incPeakExecutionMemory(8L)
    +    val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L))
    +    val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L))
    +    tm.incUpdatedBlockStatuses(Seq(block1))
    +    tm.incUpdatedBlockStatuses(Seq(block2))
    +    // assert new values exist
    +    assertValueEquals(tm, _.executorDeserializeTime, accums, 
EXECUTOR_DESERIALIZE_TIME, 1L)
    +    assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 2L)
    +    assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 3L)
    +    assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 4L)
    +    assertValueEquals(tm, _.resultSerializationTime, accums, 
RESULT_SERIALIZATION_TIME, 5L)
    +    assertValueEquals(tm, _.memoryBytesSpilled, accums, 
MEMORY_BYTES_SPILLED, 606L)
    +    assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 
707L)
    +    assertValueEquals(tm, _.peakExecutionMemory, accums, 
PEAK_EXECUTION_MEMORY, 808L)
    +    assertValueEquals(tm, _.updatedBlockStatuses, accums, 
UPDATED_BLOCK_STATUSES,
    +      Seq(block1, block2))
    +  }
    +
    +  test("mutating shuffle read metrics values") {
    +    import shuffleRead._
    +    val accums = InternalAccumulator.create()
    +    val tm = new TaskMetrics(accums)
    +    def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, 
value: T): Unit = {
    +      assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), 
accums, name, value)
    +    }
    +    // create shuffle read metrics
    +    assert(tm.shuffleReadMetrics.isEmpty)
    +    tm.registerTempShuffleReadMetrics()
    +    tm.mergeShuffleReadMetrics()
    +    assert(tm.shuffleReadMetrics.isDefined)
    +    val sr = tm.shuffleReadMetrics.get
    +    // initial values
    +    assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0)
    +    assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0)
    +    assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 0L)
    +    assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 0L)
    +    assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 0L)
    +    assertValEquals(_.recordsRead, RECORDS_READ, 0L)
    +    // set and increment values
    +    sr.setRemoteBlocksFetched(100)
    +    sr.setRemoteBlocksFetched(10)
    +    sr.incRemoteBlocksFetched(1) // 10 + 1
    +    sr.incRemoteBlocksFetched(1) // 10 + 1 + 1
    +    sr.setLocalBlocksFetched(200)
    +    sr.setLocalBlocksFetched(20)
    +    sr.incLocalBlocksFetched(2)
    +    sr.incLocalBlocksFetched(2)
    +    sr.setRemoteBytesRead(300L)
    +    sr.setRemoteBytesRead(30L)
    +    sr.incRemoteBytesRead(3L)
    +    sr.incRemoteBytesRead(3L)
    +    sr.setLocalBytesRead(400L)
    +    sr.setLocalBytesRead(40L)
    +    sr.incLocalBytesRead(4L)
    +    sr.incLocalBytesRead(4L)
    +    sr.setFetchWaitTime(500L)
    +    sr.setFetchWaitTime(50L)
    +    sr.incFetchWaitTime(5L)
    +    sr.incFetchWaitTime(5L)
    +    sr.setRecordsRead(600L)
    +    sr.setRecordsRead(60L)
    +    sr.incRecordsRead(6L)
    +    sr.incRecordsRead(6L)
    +    // assert new values exist
    +    assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 12)
    +    assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 24)
    +    assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 36L)
    +    assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 48L)
    +    assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 60L)
    +    assertValEquals(_.recordsRead, RECORDS_READ, 72L)
    +  }
    +
    +  test("mutating shuffle write metrics values") {
    +    import shuffleWrite._
    +    val accums = InternalAccumulator.create()
    +    val tm = new TaskMetrics(accums)
    +    def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: 
String, value: T): Unit = {
    +      assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), 
accums, name, value)
    +    }
    +    // create shuffle write metrics
    +    assert(tm.shuffleWriteMetrics.isEmpty)
    +    tm.registerShuffleWriteMetrics()
    +    assert(tm.shuffleWriteMetrics.isDefined)
    +    val sw = tm.shuffleWriteMetrics.get
    +    // initial values
    +    assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
    +    assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
    +    assertValEquals(_.writeTime, WRITE_TIME, 0L)
    +    // increment and decrement values
    +    sw.incBytesWritten(100L)
    +    sw.incBytesWritten(10L) // 100 + 10
    +    sw.decBytesWritten(1L) // 100 + 10 - 1
    +    sw.decBytesWritten(1L) // 100 + 10 - 1 - 1
    +    sw.incRecordsWritten(200L)
    +    sw.incRecordsWritten(20L)
    +    sw.decRecordsWritten(2L)
    +    sw.decRecordsWritten(2L)
    +    sw.incWriteTime(300L)
    +    sw.incWriteTime(30L)
    +    // assert new values exist
    +    assertValEquals(_.bytesWritten, BYTES_WRITTEN, 108L)
    +    assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 216L)
    +    assertValEquals(_.writeTime, WRITE_TIME, 330L)
    +  }
    +
    +  test("mutating input metrics values") {
    +    import input._
    +    val accums = InternalAccumulator.create()
    +    val tm = new TaskMetrics(accums)
    +    def assertValEquals(tmValue: InputMetrics => Any, name: String, value: 
Any): Unit = {
    +      assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, 
name, value,
    +        (x: Any, y: Any) => assert(x.toString === y.toString))
    +    }
    +    // create input metrics
    +    assert(tm.inputMetrics.isEmpty)
    +    tm.registerInputMetrics(DataReadMethod.Memory)
    +    assert(tm.inputMetrics.isDefined)
    +    val in = tm.inputMetrics.get
    +    // initial values
    +    assertValEquals(_.bytesRead, BYTES_READ, 0L)
    +    assertValEquals(_.recordsRead, RECORDS_READ, 0L)
    +    assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Memory)
    +    // set and increment values
    +    in.setBytesRead(1L)
    +    in.setBytesRead(2L)
    +    in.incRecordsRead(1L)
    +    in.incRecordsRead(2L)
    +    in.setReadMethod(DataReadMethod.Disk)
    +    // assert new values exist
    +    assertValEquals(_.bytesRead, BYTES_READ, 2L)
    +    assertValEquals(_.recordsRead, RECORDS_READ, 3L)
    +    assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Disk)
    +  }
    +
    +  test("mutating output metrics values") {
    +    import output._
    +    val accums = InternalAccumulator.create()
    +    val tm = new TaskMetrics(accums)
    +    def assertValEquals(tmValue: OutputMetrics => Any, name: String, 
value: Any): Unit = {
    +      assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, 
name, value,
    +        (x: Any, y: Any) => assert(x.toString === y.toString))
    +    }
    +    // create input metrics
    +    assert(tm.outputMetrics.isEmpty)
    +    tm.registerOutputMetrics(DataWriteMethod.Hadoop)
    +    assert(tm.outputMetrics.isDefined)
    +    val out = tm.outputMetrics.get
    +    // initial values
    +    assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
    +    assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
    +    assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
    +    // set values
    +    out.setBytesWritten(1L)
    +    out.setBytesWritten(2L)
    +    out.setRecordsWritten(3L)
    +    out.setRecordsWritten(4L)
    +    out.setWriteMethod(DataWriteMethod.Hadoop)
    +    // assert new values exist
    +    assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L)
    +    assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L)
    +    // Note: this doesn't actually test anything, but there's only one 
DataWriteMethod
    +    // so we can't set it to anything else
    +    assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
    +  }
    +
    +  test("merging multiple shuffle read metrics") {
    +    val tm = new TaskMetrics
    +    assert(tm.shuffleReadMetrics.isEmpty)
    +    val sr1 = tm.registerTempShuffleReadMetrics()
    +    val sr2 = tm.registerTempShuffleReadMetrics()
    +    val sr3 = tm.registerTempShuffleReadMetrics()
    +    assert(tm.shuffleReadMetrics.isEmpty)
    +    sr1.setRecordsRead(10L)
    +    sr2.setRecordsRead(10L)
    +    sr1.setFetchWaitTime(1L)
    +    sr2.setFetchWaitTime(2L)
    +    sr3.setFetchWaitTime(3L)
    +    tm.mergeShuffleReadMetrics()
    +    assert(tm.shuffleReadMetrics.isDefined)
    +    val sr = tm.shuffleReadMetrics.get
    +    assert(sr.remoteBlocksFetched === 0L)
    +    assert(sr.recordsRead === 20L)
    +    assert(sr.fetchWaitTime === 6L)
    +
    +    // SPARK-5701: calling merge without any shuffle deps does nothing
    +    val tm2 = new TaskMetrics
    +    tm2.mergeShuffleReadMetrics()
    +    assert(tm2.shuffleReadMetrics.isEmpty)
       }
    +
    +  test("register multiple shuffle write metrics") {
    +    val tm = new TaskMetrics
    +    val sw1 = tm.registerShuffleWriteMetrics()
    +    val sw2 = tm.registerShuffleWriteMetrics()
    +    assert(sw1 === sw2)
    +    assert(tm.shuffleWriteMetrics.isDefined)
    +    assert(tm.shuffleWriteMetrics.get === sw1)
    +  }
    +
    +  test("register multiple input metrics") {
    +    val tm = new TaskMetrics
    +    val im1 = tm.registerInputMetrics(DataReadMethod.Memory)
    +    val im2 = tm.registerInputMetrics(DataReadMethod.Memory)
    +    // input metrics with a different read method than the one already 
registered are ignored
    +    val im3 = tm.registerInputMetrics(DataReadMethod.Hadoop)
    +    assert(im1 === im2)
    +    assert(im1 !== im3)
    +    assert(tm.inputMetrics.isDefined)
    +    assert(tm.inputMetrics.get === im1)
    +    im2.setBytesRead(50L)
    +    im3.setBytesRead(100L)
    +    assert(tm.inputMetrics.get.bytesRead === 50L)
    +  }
    +
    +  test("register multiple output metrics") {
    +    val tm = new TaskMetrics
    +    val om1 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
    +    val om2 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
    +    assert(om1 === om2)
    +    assert(tm.outputMetrics.isDefined)
    +    assert(tm.outputMetrics.get === om1)
    +  }
    +
    +  test("additional accumulables") {
    +    val internalAccums = InternalAccumulator.create()
    +    val tm = new TaskMetrics(internalAccums)
    +    assert(tm.accumulatorUpdates().size === internalAccums.size)
    +    val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
    +    val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
    +    val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
    +    val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), internal 
= true)
    +    tm.registerAccumulator(acc1)
    +    tm.registerAccumulator(acc2)
    +    tm.registerAccumulator(acc3)
    +    tm.registerAccumulator(acc4)
    +    acc1 += 1
    +    acc2 += 2
    +    val newUpdates = tm.accumulatorUpdates()
    +    assert(newUpdates.contains(acc1.id))
    +    assert(newUpdates.contains(acc2.id))
    +    assert(newUpdates.contains(acc3.id))
    +    assert(newUpdates.contains(acc4.id))
    +    assert(newUpdates(acc1.id) === 1)
    +    assert(newUpdates(acc2.id) === 2)
    +    assert(newUpdates(acc3.id) === 0)
    +    assert(newUpdates(acc4.id) === 0)
    +    assert(newUpdates.size === internalAccums.size + 4)
    +  }
    +
    +}
    +
    +
    +// This extends SparkFunSuite only because we want its `assert` method.
    --- End diff --
    
    we want to reuse the code somewhere else


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to