Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10835#discussion_r50789137
  
    --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
    @@ -159,193 +161,157 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
         assert(!Accumulators.originals.get(accId).isDefined)
       }
     
    -  test("internal accumulators in TaskContext") {
    +  test("get accum") {
         sc = new SparkContext("local", "test")
    -    val accums = InternalAccumulator.create(sc)
    -    val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
    -    val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
    -    val collectedInternalAccums = taskContext.collectInternalAccumulators()
    -    val collectedAccums = taskContext.collectAccumulators()
    -    assert(internalMetricsToAccums.size > 0)
    -    assert(internalMetricsToAccums.values.forall(_.isInternal))
    -    assert(internalMetricsToAccums.contains(TEST_ACCUMULATOR))
    -    val testAccum = internalMetricsToAccums(TEST_ACCUMULATOR)
    -    assert(collectedInternalAccums.size === internalMetricsToAccums.size)
    -    assert(collectedInternalAccums.size === collectedAccums.size)
    -    assert(collectedInternalAccums.contains(testAccum.id))
    -    assert(collectedAccums.contains(testAccum.id))
    -  }
    +    // Don't register with SparkContext for cleanup
    +    var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, 
true, true)
    +    val accId = acc.id
    +    val ref = WeakReference(acc)
    +    assert(ref.get.isDefined)
    +    Accumulators.register(ref.get.get)
     
    -  test("internal accumulators in a stage") {
    -    val listener = new SaveInfoListener
    -    val numPartitions = 10
    -    sc = new SparkContext("local", "test")
    -    sc.addSparkListener(listener)
    -    // Have each task add 1 to the internal accumulator
    -    val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter 
=>
    -      TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 
1
    -      iter
    -    }
    -    // Register asserts in job completion callback to avoid flakiness
    -    listener.registerJobCompletionCallback { _ =>
    -      val stageInfos = listener.getCompletedStageInfos
    -      val taskInfos = listener.getCompletedTaskInfos
    -      assert(stageInfos.size === 1)
    -      assert(taskInfos.size === numPartitions)
    -      // The accumulator values should be merged in the stage
    -      val stageAccum = 
findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
    -      assert(stageAccum.value.toLong === numPartitions)
    -      // The accumulator should be updated locally on each task
    -      val taskAccumValues = taskInfos.map { taskInfo =>
    -        val taskAccum = findAccumulableInfo(taskInfo.accumulables, 
TEST_ACCUMULATOR)
    -        assert(taskAccum.update.isDefined)
    -        assert(taskAccum.update.get.toLong === 1)
    -        taskAccum.value.toLong
    -      }
    -      // Each task should keep track of the partial value on the way, i.e. 
1, 2, ... numPartitions
    -      assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
    +    // Remove the explicit reference to it and allow weak reference to get 
garbage collected
    +    acc = null
    +    System.gc()
    +    assert(ref.get.isEmpty)
    +
    +    // Getting a garbage collected accum should throw error
    +    intercept[IllegalAccessError] {
    +      Accumulators.get(accId)
         }
    -    rdd.count()
    +
    +    // Getting a normal accumulator. Note: this has to be separate because 
referencing an
    +    // accumulator above in an `assert` would keep it from being garbage 
collected.
    +    val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, 
true, true)
    +    Accumulators.register(acc2)
    +    assert(Accumulators.get(acc2.id) === Some(acc2))
    +
    +    // Getting an accumulator that does not exist should return None
    +    assert(Accumulators.get(100000).isEmpty)
       }
     
    -  test("internal accumulators in multiple stages") {
    -    val listener = new SaveInfoListener
    -    val numPartitions = 10
    -    sc = new SparkContext("local", "test")
    -    sc.addSparkListener(listener)
    -    // Each stage creates its own set of internal accumulators so the
    -    // values for the same metric should not be mixed up across stages
    -    val rdd = sc.parallelize(1 to 100, numPartitions)
    -      .map { i => (i, i) }
    -      .mapPartitions { iter =>
    -        TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) 
+= 1
    -        iter
    -      }
    -      .reduceByKey { case (x, y) => x + y }
    -      .mapPartitions { iter =>
    -        TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) 
+= 10
    -        iter
    -      }
    -      .repartition(numPartitions * 2)
    -      .mapPartitions { iter =>
    -        TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) 
+= 100
    -        iter
    -      }
    -    // Register asserts in job completion callback to avoid flakiness
    -    listener.registerJobCompletionCallback { _ =>
    -      // We ran 3 stages, and the accumulator values should be distinct
    -      val stageInfos = listener.getCompletedStageInfos
    -      assert(stageInfos.size === 3)
    -      val (firstStageAccum, secondStageAccum, thirdStageAccum) =
    -        (findAccumulableInfo(stageInfos(0).accumulables.values, 
TEST_ACCUMULATOR),
    -        findAccumulableInfo(stageInfos(1).accumulables.values, 
TEST_ACCUMULATOR),
    -        findAccumulableInfo(stageInfos(2).accumulables.values, 
TEST_ACCUMULATOR))
    -      assert(firstStageAccum.value.toLong === numPartitions)
    -      assert(secondStageAccum.value.toLong === numPartitions * 10)
    -      assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
    -    }
    -    rdd.count()
    +  test("only external accums are automatically registered") {
    +    val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), 
internal = false)
    +    val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), 
internal = true)
    +    assert(!accEx.isInternal)
    +    assert(accIn.isInternal)
    +    assert(Accumulators.get(accEx.id).isDefined)
    +    assert(Accumulators.get(accIn.id).isEmpty)
       }
     
    -  test("internal accumulators in fully resubmitted stages") {
    -    testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all 
tasks
    +  test("copy") {
    +    val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, 
Some("x"), true, false)
    +    val acc2 = acc1.copy()
    +    assert(acc1.id === acc2.id)
    +    assert(acc1.value === acc2.value)
    +    assert(acc1.name === acc2.name)
    +    assert(acc1.isInternal === acc2.isInternal)
    +    assert(acc1.countFailedValues === acc2.countFailedValues)
    +    assert(acc1 !== acc2)
    +    // Modifying one does not affect the other
    +    acc1.add(44L)
    +    assert(acc1.value === 500L)
    +    assert(acc2.value === 456L)
    +    acc2.add(144L)
    +    assert(acc1.value === 500L)
    +    assert(acc2.value === 600L)
       }
     
    -  test("internal accumulators in partially resubmitted stages") {
    -    testInternalAccumulatorsWithFailedTasks((i: Int) => i % 2 == 0) // 
fail a subset
    +  test("register multiple accums with same ID") {
    +    // Make sure these are internal accums so we don't automatically 
register them already
    +    val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, 
true, true)
    +    val acc2 = acc1.copy()
    +    assert(acc1 !== acc2)
    +    assert(acc1.id === acc2.id)
    +    assert(Accumulators.originals.isEmpty)
    +    assert(Accumulators.get(acc1.id).isEmpty)
    +    Accumulators.register(acc1)
    +    Accumulators.register(acc2)
    +    // The second one does not override the first one
    +    assert(Accumulators.originals.size === 1)
    +    assert(Accumulators.get(acc1.id) === Some(acc1))
       }
     
    -  /**
    -   * Return the accumulable info that matches the specified name.
    -   */
    -  private def findAccumulableInfo(
    -      accums: Iterable[AccumulableInfo],
    -      name: String): AccumulableInfo = {
    -    accums.find { a => a.name == name }.getOrElse {
    -      throw new TestFailedException(s"internal accumulator '$name' not 
found", 0)
    -    }
    +  test("string accumulator param") {
    +    val acc = new Accumulator("", StringAccumulatorParam, Some("darkness"))
    +    assert(acc.value === "")
    +    acc.setValue("feeds")
    +    assert(acc.value === "feeds")
    +    acc.add("your")
    +    assert(acc.value === "your") // value is overwritten, not concatenated
    +    acc += "soul"
    +    assert(acc.value === "soul")
    +    acc ++= "with"
    +    assert(acc.value === "with")
    +    acc.merge("kindness")
    +    assert(acc.value === "kindness")
       }
     
    -  /**
    -   * Test whether internal accumulators are merged properly if some tasks 
fail.
    -   */
    -  private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int 
=> Boolean)): Unit = {
    -    val listener = new SaveInfoListener
    -    val numPartitions = 10
    -    val numFailedPartitions = (0 until numPartitions).count(failCondition)
    -    // This says use 1 core and retry tasks up to 2 times
    -    sc = new SparkContext("local[1, 2]", "test")
    -    sc.addSparkListener(listener)
    -    val rdd = sc.parallelize(1 to 100, 
numPartitions).mapPartitionsWithIndex { case (i, iter) =>
    -      val taskContext = TaskContext.get()
    -      taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
    -      // Fail the first attempts of a subset of the tasks
    -      if (failCondition(i) && taskContext.attemptNumber() == 0) {
    -        throw new Exception("Failing a task intentionally.")
    -      }
    -      iter
    -    }
    -    // Register asserts in job completion callback to avoid flakiness
    -    listener.registerJobCompletionCallback { _ =>
    -      val stageInfos = listener.getCompletedStageInfos
    -      val taskInfos = listener.getCompletedTaskInfos
    -      assert(stageInfos.size === 1)
    -      assert(taskInfos.size === numPartitions + numFailedPartitions)
    -      val stageAccum = 
findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
    -      // We should not double count values in the merged accumulator
    -      assert(stageAccum.value.toLong === numPartitions)
    -      val taskAccumValues = taskInfos.flatMap { taskInfo =>
    -        if (!taskInfo.failed) {
    -          // If a task succeeded, its update value should always be 1
    -          val taskAccum = findAccumulableInfo(taskInfo.accumulables, 
TEST_ACCUMULATOR)
    -          assert(taskAccum.update.isDefined)
    -          assert(taskAccum.update.get.toLong === 1)
    -          Some(taskAccum.value.toLong)
    -        } else {
    -          // If a task failed, we should not get its accumulator values
    -          assert(taskInfo.accumulables.isEmpty)
    -          None
    -        }
    -      }
    -      assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
    -    }
    -    rdd.count()
    +  test("list accumulator param") {
    +    val acc = new Accumulator(Seq.empty[Int], new 
ListAccumulatorParam[Int], Some("numbers"))
    +    assert(acc.value === Seq.empty[Int])
    +    acc.add(Seq(1, 2))
    +    assert(acc.value === Seq(1, 2))
    +    acc += Seq(3, 4)
    +    assert(acc.value === Seq(1, 2, 3, 4))
    +    acc ++= Seq(5, 6)
    +    assert(acc.value === Seq(1, 2, 3, 4, 5, 6))
    +    acc.merge(Seq(7, 8))
    +    assert(acc.value === Seq(1, 2, 3, 4, 5, 6, 7, 8))
    +    acc.setValue(Seq(9, 10))
    +    assert(acc.value === Seq(9, 10))
    +  }
    +
    +  test("value is reset on the executors") {
    +    val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), 
internal = false)
    +    val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), 
internal = false)
    +    val externalAccums = Seq(acc1, acc2)
    +    val internalAccums = InternalAccumulator.create()
    +    // Set some values; these should not be observed later on the 
"executors"
    +    acc1.setValue(10)
    +    acc2.setValue(20L)
    +    internalAccums
    +      .find(_.name == Some(InternalAccumulator.TEST_ACCUM))
    +      .get.asInstanceOf[Accumulator[Long]]
    +      .setValue(30L)
    +    // Simulate the task being serialized and sent to the executors.
    +    val dummyTask = new DummyTask(internalAccums, externalAccums)
    +    val serInstance = new JavaSerializer(new SparkConf).newInstance()
    +    val taskSer = Task.serializeWithDependencies(
    +      dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
    +    // Now we're on the executors.
    +    // Deserialize the task and assert that its accumulators are zero'ed 
out.
    +    val (_, _, taskBytes) = Task.deserializeWithDependencies(taskSer)
    +    val taskDeser = serInstance.deserialize[DummyTask](
    +      taskBytes, Thread.currentThread.getContextClassLoader)
    +    // Assert that executors see only zeros
    +    taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) 
}
    +    taskDeser.internalAccums.foreach { a => assert(a.localValue == a.zero) 
}
       }
     
     }
     
     private[spark] object AccumulatorSuite {
     
    +  import InternalAccumulator._
    +
       /**
    -   * Run one or more Spark jobs and verify that the peak execution memory 
accumulator
    -   * is updated afterwards.
    +   * Run one or more Spark jobs and verify that in at least one job the 
peak execution memory
    +   * accumulator is updated afterwards.
        */
       def verifyPeakExecutionMemorySet(
           sc: SparkContext,
           testName: String)(testBody: => Unit): Unit = {
         val listener = new SaveInfoListener
         sc.addSparkListener(listener)
    -    // Register asserts in job completion callback to avoid flakiness
    -    listener.registerJobCompletionCallback { jobId =>
    -      if (jobId == 0) {
    -        // The first job is a dummy one to verify that the accumulator 
does not already exist
    -        val accums = 
listener.getCompletedStageInfos.flatMap(_.accumulables.values)
    -        assert(!accums.exists(_.name == 
InternalAccumulator.PEAK_EXECUTION_MEMORY))
    -      } else {
    -        // In the subsequent jobs, verify that peak execution memory is 
updated
    -        val accum = listener.getCompletedStageInfos
    -          .flatMap(_.accumulables.values)
    -          .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
    -          .getOrElse {
    -          throw new TestFailedException(
    -            s"peak execution memory accumulator not set in '$testName'", 0)
    -        }
    -        assert(accum.value.toLong > 0)
    -      }
    -    }
    -    // Run the jobs
    -    sc.parallelize(1 to 10).count()
         testBody
    +    val accums = 
listener.getCompletedStageInfos.flatMap(_.accumulables.values)
    +    val isSet = accums.exists { a =>
    +      a.name == PEAK_EXECUTION_MEMORY && a.value.exists(_.toString.toLong 
> 0L)
    --- End diff --
    
    changed to `asInstanceOf[Long]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to