Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e5690a502 -> cc9eec1a0


[SPARK-5735] Replace uses of EasyMock with Mockito

This patch replaces all uses of EasyMock with Mockito.  There are two 
motivations for this:

1. We should use a single mocking framework in our tests in order to keep 
things consistent.
2. EasyMock may be responsible for non-deterministic unit test failures due to 
its Objensis dependency (see SPARK-5626 for more details).

Most of these changes are fairly mechanical translations of Mockito code to 
EasyMock, although I made a small change that strengthens the assertions in one 
test in KinesisReceiverSuite.

Author: Josh Rosen <joshro...@databricks.com>

Closes #4578 from JoshRosen/SPARK-5735-remove-easymock and squashes the 
following commits:

0ab192b [Josh Rosen] Import sorting plus two minor changes to more closely 
match old semantics.
977565b [Josh Rosen] Remove EasyMock from build.
fae1d8f [Josh Rosen] Remove EasyMock usage in KinesisReceiverSuite.
7cca486 [Josh Rosen] Remove EasyMock usage in MesosSchedulerBackendSuite
fc5e94d [Josh Rosen] Remove EasyMock in CacheManagerSuite

(cherry picked from commit 077eec2d9dba197f51004ee4a322d0fa71424ea0)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc9eec1a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc9eec1a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc9eec1a

Branch: refs/heads/branch-1.3
Commit: cc9eec1a076624628d3d582e7c679f0861ecb39c
Parents: e5690a5
Author: Josh Rosen <joshro...@databricks.com>
Authored: Fri Feb 13 09:53:57 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Feb 13 09:55:42 2015 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |  10 -
 .../org/apache/spark/CacheManagerSuite.scala    |  42 ++-
 .../mesos/MesosSchedulerBackendSuite.scala      | 125 ++++-----
 extras/kinesis-asl/pom.xml                      |   5 -
 .../kinesis/KinesisReceiverSuite.scala          | 263 +++++++++----------
 pom.xml                                         |  13 -
 6 files changed, 207 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc9eec1a/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index c86d118..aca0f58 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -330,16 +330,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.easymock</groupId>
-      <artifactId>easymockclassextension</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>asm</groupId>
-      <artifactId>asm</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/cc9eec1a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index d7d9dc7..4b25c20 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -17,16 +17,18 @@
 
 package org.apache.spark
 
+import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.scalatest.mock.EasyMockSugar
+import org.scalatest.mock.MockitoSugar
 
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
+import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
 
 // TODO: Test the CacheManager's thread-safety aspects
-class CacheManagerSuite extends FunSuite with BeforeAndAfter with 
EasyMockSugar {
-  var sc : SparkContext = _
+class CacheManagerSuite extends FunSuite with LocalSparkContext with 
BeforeAndAfter
+  with MockitoSugar {
+
   var blockManager: BlockManager = _
   var cacheManager: CacheManager = _
   var split: Partition = _
@@ -57,10 +59,6 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
     }.cache()
   }
 
-  after {
-    sc.stop()
-  }
-
   test("get uncached rdd") {
     // Do not mock this test, because attempting to match Array[Any], which is 
not covariant,
     // in blockManager.put is a losing battle. You have been warned.
@@ -75,29 +73,21 @@ class CacheManagerSuite extends FunSuite with 
BeforeAndAfter with EasyMockSugar
   }
 
   test("get cached rdd") {
-    expecting {
-      val result = new BlockResult(Array(5, 6, 7).iterator, 
DataReadMethod.Memory, 12)
-      blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
-    }
+    val result = new BlockResult(Array(5, 6, 7).iterator, 
DataReadMethod.Memory, 12)
+    when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
 
-    whenExecuting(blockManager) {
-      val context = new TaskContextImpl(0, 0, 0, 0)
-      val value = cacheManager.getOrCompute(rdd, split, context, 
StorageLevel.MEMORY_ONLY)
-      assert(value.toList === List(5, 6, 7))
-    }
+    val context = new TaskContextImpl(0, 0, 0, 0)
+    val value = cacheManager.getOrCompute(rdd, split, context, 
StorageLevel.MEMORY_ONLY)
+    assert(value.toList === List(5, 6, 7))
   }
 
   test("get uncached local rdd") {
-    expecting {
-      // Local computation should not persist the resulting value, so don't 
expect a put().
-      blockManager.get(RDDBlockId(0, 0)).andReturn(None)
-    }
+    // Local computation should not persist the resulting value, so don't 
expect a put().
+    when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
 
-    whenExecuting(blockManager) {
-      val context = new TaskContextImpl(0, 0, 0, 0, true)
-      val value = cacheManager.getOrCompute(rdd, split, context, 
StorageLevel.MEMORY_ONLY)
-      assert(value.toList === List(1, 2, 3, 4))
-    }
+    val context = new TaskContextImpl(0, 0, 0, 0, true)
+    val value = cacheManager.getOrCompute(rdd, split, context, 
StorageLevel.MEMORY_ONLY)
+    assert(value.toList === List(1, 2, 3, 4))
   }
 
   test("verify task metrics updated correctly") {

http://git-wip-us.apache.org/repos/asf/spark/blob/cc9eec1a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 46ab02b..8cd302e 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -17,45 +17,47 @@
 
 package org.apache.spark.scheduler.mesos
 
-import org.apache.spark.executor.MesosExecutorBackend
-import org.scalatest.FunSuite
-import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
-import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
-  TaskDescription, WorkerOffer, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, 
MesosSchedulerBackend}
-import org.apache.mesos.SchedulerDriver
-import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _}
-import org.apache.mesos.Protos.Value.Scalar
-import org.easymock.{Capture, EasyMock}
 import java.nio.ByteBuffer
-import java.util.Collections
 import java.util
-import org.scalatest.mock.EasyMockSugar
+import java.util.Collections
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with 
EasyMockSugar {
+import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
+import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerBackend, 
MemoryUtils}
+
+class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with 
MockitoSugar {
 
   test("check spark-class location correctly") {
     val conf = new SparkConf
     conf.set("spark.mesos.executor.home" , "/mesos-home")
 
-    val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new 
ExecutorInfo("host1", 2, Map.empty)))
-    EasyMock.replay(listenerBus)
-
-    val sc = EasyMock.createMock(classOf[SparkContext])
-    
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes()
-    EasyMock.expect(sc.conf).andReturn(conf).anyTimes()
-    EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
-    EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
-    EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
-    EasyMock.replay(sc)
-    val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
-    EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
-    EasyMock.replay(taskScheduler)
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+    when(sc.conf).thenReturn(conf)
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.listenerBus).thenReturn(listenerBus)
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
 
     val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, 
"master")
 
@@ -84,20 +86,19 @@ class MesosSchedulerBackendSuite extends FunSuite with 
LocalSparkContext with Ea
         
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
     }
 
-    val driver = EasyMock.createMock(classOf[SchedulerDriver])
-    val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
+    val driver = mock[SchedulerDriver]
+    val taskScheduler = mock[TaskSchedulerImpl]
 
-    val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new 
ExecutorInfo("host1", 2, Map.empty)))
-    EasyMock.replay(listenerBus)
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
 
-    val sc = EasyMock.createMock(classOf[SparkContext])
-    EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
-    EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
-    EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
-    EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
-    EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
-    EasyMock.replay(sc)
+    val sc = mock[SparkContext]
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.getSparkHome()).thenReturn(Option("/path"))
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.conf).thenReturn(new SparkConf)
+    when(sc.listenerBus).thenReturn(listenerBus)
 
     val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
     val minCpu = 4
@@ -121,25 +122,29 @@ class MesosSchedulerBackendSuite extends FunSuite with 
LocalSparkContext with Ea
       2
     ))
     val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, 
ByteBuffer.wrap(new Array[Byte](0)))
-    
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
-    EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
-    EasyMock.replay(taskScheduler)
+    
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
 
-    val capture = new Capture[util.Collection[TaskInfo]]
-    EasyMock.expect(
+    val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+    when(
       driver.launchTasks(
-        EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)),
-        EasyMock.capture(capture),
-        EasyMock.anyObject(classOf[Filters])
+        Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+        capture.capture(),
+        any(classOf[Filters])
       )
-    ).andReturn(Status.valueOf(1)).once
-    
EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
-    
EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
-    EasyMock.replay(driver)
+    ).thenReturn(Status.valueOf(1))
+    
when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
+    
when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))
 
     backend.resourceOffers(driver, mesosOffers)
 
-    EasyMock.verify(driver)
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+      capture.capture(),
+      any(classOf[Filters])
+    )
+    verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
+    verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
     assert(capture.getValue.size() == 1)
     val taskInfo = capture.getValue.iterator().next()
     assert(taskInfo.getName.equals("n1"))
@@ -151,15 +156,13 @@ class MesosSchedulerBackendSuite extends FunSuite with 
LocalSparkContext with Ea
     // Unwanted resources offered on an existing node. Make sure they are 
declined
     val mesosOffers2 = new java.util.ArrayList[Offer]
     mesosOffers2.add(createOffer(1, minMem, minCpu))
-    EasyMock.reset(taskScheduler)
-    EasyMock.reset(driver)
-    
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
-    EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
-    EasyMock.replay(taskScheduler)
-    
EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
-    EasyMock.replay(driver)
+    reset(taskScheduler)
+    reset(driver)
+    
when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+    
when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
 
     backend.resourceOffers(driver, mesosOffers2)
-    EasyMock.verify(driver)
+    verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc9eec1a/extras/kinesis-asl/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index e4c7a8f..381df2f 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -68,11 +68,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.easymock</groupId>
-      <artifactId>easymockclassextension</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>com.novocode</groupId>
       <artifactId>junit-interface</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/cc9eec1a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 41dbd64..f56898a 100644
--- 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions.seqAsJavaList
 
-import org.apache.spark.annotation.Experimental
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Milliseconds
 import org.apache.spark.streaming.Seconds
@@ -28,9 +27,11 @@ import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.TestSuiteBase
 import org.apache.spark.streaming.util.Clock
 import org.apache.spark.streaming.util.ManualClock
+
+import org.mockito.Mockito._
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
-import org.scalatest.mock.EasyMockSugar
+import org.scalatest.mock.MockitoSugar
 
 import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
 import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
@@ -42,10 +43,10 @@ import 
com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
 
 /**
- *  Suite of Kinesis streaming receiver tests focusing mostly on the 
KinesisRecordProcessor 
+ * Suite of Kinesis streaming receiver tests focusing mostly on the 
KinesisRecordProcessor
  */
 class KinesisReceiverSuite extends TestSuiteBase with Matchers with 
BeforeAndAfter
-    with EasyMockSugar {
+    with MockitoSugar {
 
   val app = "TestKinesisReceiver"
   val stream = "mySparkStream"
@@ -73,6 +74,14 @@ class KinesisReceiverSuite extends TestSuiteBase with 
Matchers with BeforeAndAft
     currentClockMock = mock[Clock]
   }
 
+  override def afterFunction(): Unit = {
+    super.afterFunction()
+    // Since this suite was originally written using EasyMock, add this to 
preserve the old
+    // mocking semantics (see SPARK-5735 for more details)
+    verifyNoMoreInteractions(receiverMock, checkpointerMock, 
checkpointClockMock,
+      checkpointStateMock, currentClockMock)
+  }
+
   test("kinesis utils api") {
     val ssc = new StreamingContext(master, framework, batchDuration)
     // Tests the API, does not actually test data receiving
@@ -83,193 +92,175 @@ class KinesisReceiverSuite extends TestSuiteBase with 
Matchers with BeforeAndAft
   }
 
   test("process records including store and checkpoint") {
-    val expectedCheckpointIntervalMillis = 10
-    expecting {
-      receiverMock.isStopped().andReturn(false).once()
-      receiverMock.store(record1.getData().array()).once()
-      receiverMock.store(record2.getData().array()).once()
-      checkpointStateMock.shouldCheckpoint().andReturn(true).once()
-      checkpointerMock.checkpoint().once()
-      checkpointStateMock.advanceCheckpoint().once()
-    }
-    whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
-      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
-          checkpointStateMock)
-      recordProcessor.processRecords(batch, checkpointerMock)
-    }
+    when(receiverMock.isStopped()).thenReturn(false)
+    when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
+
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
checkpointStateMock)
+    recordProcessor.processRecords(batch, checkpointerMock)
+
+    verify(receiverMock, times(1)).isStopped()
+    verify(receiverMock, times(1)).store(record1.getData().array())
+    verify(receiverMock, times(1)).store(record2.getData().array())
+    verify(checkpointStateMock, times(1)).shouldCheckpoint()
+    verify(checkpointerMock, times(1)).checkpoint()
+    verify(checkpointStateMock, times(1)).advanceCheckpoint()
   }
 
   test("shouldn't store and checkpoint when receiver is stopped") {
-    expecting {
-      receiverMock.isStopped().andReturn(true).once()
-    }
-    whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
-      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
-          checkpointStateMock)
-      recordProcessor.processRecords(batch, checkpointerMock)
-    }
+    when(receiverMock.isStopped()).thenReturn(true)
+
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
checkpointStateMock)
+    recordProcessor.processRecords(batch, checkpointerMock)
+
+    verify(receiverMock, times(1)).isStopped()
   }
 
   test("shouldn't checkpoint when exception occurs during store") {
-    expecting {
-      receiverMock.isStopped().andReturn(false).once()
-      receiverMock.store(record1.getData().array()).andThrow(new 
RuntimeException()).once()
-    }
-    whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
-      intercept[RuntimeException] {
-        val recordProcessor = new KinesisRecordProcessor(receiverMock, 
workerId,
-            checkpointStateMock)
-        recordProcessor.processRecords(batch, checkpointerMock)
-      }
+    when(receiverMock.isStopped()).thenReturn(false)
+    when(receiverMock.store(record1.getData().array())).thenThrow(new 
RuntimeException())
+
+    intercept[RuntimeException] {
+      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
checkpointStateMock)
+      recordProcessor.processRecords(batch, checkpointerMock)
     }
+
+    verify(receiverMock, times(1)).isStopped()
+    verify(receiverMock, times(1)).store(record1.getData().array())
   }
 
   test("should set checkpoint time to currentTime + checkpoint interval upon 
instantiation") {
-    expecting {
-      currentClockMock.currentTime().andReturn(0).once()
-    }
-    whenExecuting(currentClockMock) {
+    when(currentClockMock.currentTime()).thenReturn(0)
+
     val checkpointIntervalMillis = 10
-    val checkpointState = new 
KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+    val checkpointState =
+      new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), 
currentClockMock)
     assert(checkpointState.checkpointClock.currentTime() == 
checkpointIntervalMillis)
-    }
+
+    verify(currentClockMock, times(1)).currentTime()
   }
 
   test("should checkpoint if we have exceeded the checkpoint interval") {
-    expecting {
-      currentClockMock.currentTime().andReturn(0).once()
-    }
-    whenExecuting(currentClockMock) {
-      val checkpointState = new 
KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
-      assert(checkpointState.shouldCheckpoint())
-    }
+    when(currentClockMock.currentTime()).thenReturn(0)
+
+    val checkpointState = new 
KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
+    assert(checkpointState.shouldCheckpoint())
+
+    verify(currentClockMock, times(1)).currentTime()
   }
 
   test("shouldn't checkpoint if we have not exceeded the checkpoint interval") 
{
-    expecting {
-      currentClockMock.currentTime().andReturn(0).once()
-    }
-    whenExecuting(currentClockMock) {
-      val checkpointState = new 
KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
-      assert(!checkpointState.shouldCheckpoint())
-    }
+    when(currentClockMock.currentTime()).thenReturn(0)
+
+    val checkpointState = new 
KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
+    assert(!checkpointState.shouldCheckpoint())
+
+    verify(currentClockMock, times(1)).currentTime()
   }
 
   test("should add to time when advancing checkpoint") {
-    expecting {
-      currentClockMock.currentTime().andReturn(0).once()
-    }
-    whenExecuting(currentClockMock) {
-      val checkpointIntervalMillis = 10
-      val checkpointState = new 
KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
-      assert(checkpointState.checkpointClock.currentTime() == 
checkpointIntervalMillis)
-      checkpointState.advanceCheckpoint()
-      assert(checkpointState.checkpointClock.currentTime() == (2 * 
checkpointIntervalMillis))
-    }
+    when(currentClockMock.currentTime()).thenReturn(0)
+
+    val checkpointIntervalMillis = 10
+    val checkpointState =
+      new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), 
currentClockMock)
+    assert(checkpointState.checkpointClock.currentTime() == 
checkpointIntervalMillis)
+    checkpointState.advanceCheckpoint()
+    assert(checkpointState.checkpointClock.currentTime() == (2 * 
checkpointIntervalMillis))
+
+    verify(currentClockMock, times(1)).currentTime()
   }
 
   test("shutdown should checkpoint if the reason is TERMINATE") {
-    expecting {
-      checkpointerMock.checkpoint().once()
-    }
-    whenExecuting(checkpointerMock, checkpointStateMock) {
-      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
-          checkpointStateMock)
-      val reason = ShutdownReason.TERMINATE
-      recordProcessor.shutdown(checkpointerMock, reason)
-    }
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
checkpointStateMock)
+    val reason = ShutdownReason.TERMINATE
+    recordProcessor.shutdown(checkpointerMock, reason)
+
+    verify(checkpointerMock, times(1)).checkpoint()
   }
 
   test("shutdown should not checkpoint if the reason is something other than 
TERMINATE") {
-    expecting {
-    }
-    whenExecuting(checkpointerMock, checkpointStateMock) {
-      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
-          checkpointStateMock)
-      recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
-      recordProcessor.shutdown(checkpointerMock, null)
-    }
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
checkpointStateMock)
+    recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+    recordProcessor.shutdown(checkpointerMock, null)
+
+    verify(checkpointerMock, never()).checkpoint()
   }
 
   test("retry success on first attempt") {
     val expectedIsStopped = false
-    expecting {
-      receiverMock.isStopped().andReturn(expectedIsStopped).once()
-    }
-    whenExecuting(receiverMock) {
-      val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
-      assert(actualVal == expectedIsStopped)
-    }
+    when(receiverMock.isStopped()).thenReturn(expectedIsStopped)
+
+    val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+    assert(actualVal == expectedIsStopped)
+
+    verify(receiverMock, times(1)).isStopped()
   }
 
   test("retry success on second attempt after a Kinesis throttling exception") 
{
     val expectedIsStopped = false
-    expecting {
-      receiverMock.isStopped().andThrow(new ThrottlingException("error 
message"))
-        .andReturn(expectedIsStopped).once()
-    }
-    whenExecuting(receiverMock) {
-      val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
-      assert(actualVal == expectedIsStopped)
-    }
+    when(receiverMock.isStopped())
+        .thenThrow(new ThrottlingException("error message"))
+        .thenReturn(expectedIsStopped)
+
+    val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+    assert(actualVal == expectedIsStopped)
+
+    verify(receiverMock, times(2)).isStopped()
   }
 
   test("retry success on second attempt after a Kinesis dependency exception") 
{
     val expectedIsStopped = false
-    expecting {
-      receiverMock.isStopped().andThrow(new 
KinesisClientLibDependencyException("error message"))
-        .andReturn(expectedIsStopped).once()
-    }
-    whenExecuting(receiverMock) {
-      val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
-      assert(actualVal == expectedIsStopped)
-    }
+    when(receiverMock.isStopped())
+        .thenThrow(new KinesisClientLibDependencyException("error message"))
+        .thenReturn(expectedIsStopped)
+
+    val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+    assert(actualVal == expectedIsStopped)
+
+    verify(receiverMock, times(2)).isStopped()
   }
 
   test("retry failed after a shutdown exception") {
-    expecting {
-      checkpointerMock.checkpoint().andThrow(new ShutdownException("error 
message")).once()
-    }
-    whenExecuting(checkpointerMock) {
-      intercept[ShutdownException] {
-        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 
100)
-      }
+    when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error 
message"))
+
+    intercept[ShutdownException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
     }
+
+    verify(checkpointerMock, times(1)).checkpoint()
   }
 
   test("retry failed after an invalid state exception") {
-    expecting {
-      checkpointerMock.checkpoint().andThrow(new InvalidStateException("error 
message")).once()
-    }
-    whenExecuting(checkpointerMock) {
-      intercept[InvalidStateException] {
-        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 
100)
-      }
+    when(checkpointerMock.checkpoint()).thenThrow(new 
InvalidStateException("error message"))
+
+    intercept[InvalidStateException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
     }
+
+    verify(checkpointerMock, times(1)).checkpoint()
   }
 
   test("retry failed after unexpected exception") {
-    expecting {
-      checkpointerMock.checkpoint().andThrow(new RuntimeException("error 
message")).once()
-    }
-    whenExecuting(checkpointerMock) {
-      intercept[RuntimeException] {
-        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 
100)
-      }
+    when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error 
message"))
+
+    intercept[RuntimeException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
     }
+
+    verify(checkpointerMock, times(1)).checkpoint()
   }
 
   test("retry failed after exhausing all retries") {
     val expectedErrorMessage = "final try error message"
-    expecting {
-      checkpointerMock.checkpoint().andThrow(new ThrottlingException("error 
message"))
-        .andThrow(new ThrottlingException(expectedErrorMessage)).once()
-    }
-    whenExecuting(checkpointerMock) {
-      val exception = intercept[RuntimeException] {
-        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 
100)
-      }
-      exception.getMessage().shouldBe(expectedErrorMessage)
+    when(checkpointerMock.checkpoint())
+        .thenThrow(new ThrottlingException("error message"))
+        .thenThrow(new ThrottlingException(expectedErrorMessage))
+
+    val exception = intercept[RuntimeException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
     }
+    exception.getMessage().shouldBe(expectedErrorMessage)
+
+    verify(checkpointerMock, times(2)).checkpoint()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc9eec1a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bc082b5..94df0bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -620,19 +620,6 @@
         <scope>test</scope>
       </dependency>
       <dependency>
-        <groupId>org.easymock</groupId>
-        <artifactId>easymockclassextension</artifactId>
-        <version>3.1</version>
-        <scope>test</scope>
-      </dependency>
-      <!-- Needed by cglib which is needed by easymock. -->
-      <dependency>
-        <groupId>asm</groupId>
-        <artifactId>asm</artifactId>
-        <version>3.3.1</version>
-        <scope>test</scope>
-      </dependency>
-      <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-all</artifactId>
         <version>1.9.0</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to