Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22473#discussion_r219580690
  
    --- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
    @@ -252,18 +253,121 @@ class ExecutorSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSug
         }
       }
     
    +  test("Heartbeat should drop zero metrics") {
    +    heartbeatZeroMetricTest(true)
    +  }
    +
    +  test("Heartbeat should not drop zero metrics when the conf is set to 
false") {
    +    heartbeatZeroMetricTest(false)
    +  }
    +
    +  private def withHeartbeatExecutor(confs: (String, String)*)
    +      (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
    +    val conf = new SparkConf
    +    confs.foreach { case (k, v) => conf.set(k, v) }
    +    val serializer = new JavaSerializer(conf)
    +    val env = createMockEnv(conf, serializer)
    +    val executor =
    +      new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, 
isLocal = true)
    +    val executorClass = classOf[Executor]
    +
    +    // Set ExecutorMetricType.values to be a minimal set to avoid get null 
exceptions
    +    val metricClass =
    +      
Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName()
 + "$")
    +    val metricTypeValues = metricClass.getDeclaredField("values")
    +    metricTypeValues.setAccessible(true)
    +    metricTypeValues.set(
    +      org.apache.spark.metrics.ExecutorMetricType,
    +      IndexedSeq(JVMHeapMemory, JVMOffHeapMemory))
    +
    +    // Save all heartbeats sent into an ArrayBuffer for verification
    +    val heartbeats = ArrayBuffer[Heartbeat]()
    +    val mockReceiver = mock[RpcEndpointRef]
    +    when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
    +      .thenAnswer(new Answer[HeartbeatResponse] {
    +        override def answer(invocation: InvocationOnMock): 
HeartbeatResponse = {
    +          val args = invocation.getArguments()
    +          val mock = invocation.getMock
    +          heartbeats += args(0).asInstanceOf[Heartbeat]
    +          HeartbeatResponse(false)
    +        }
    +      })
    +    val receiverRef = 
executorClass.getDeclaredField("heartbeatReceiverRef")
    +    receiverRef.setAccessible(true)
    +    receiverRef.set(executor, mockReceiver)
    +
    +    f(executor, heartbeats)
    +  }
    +
    +  private def invokeReportHeartbeat(executor: Executor): Unit = {
    --- End diff --
    
    You can mixin `org.scalatest.PrivateMethodTester` to replace this method, 
such as
    ```
        val reportHeartBeat = PrivateMethod[Long]('reportHeartBeat)
        ...
         executor.invokePrivate(reportHeartBeat())
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to