awaitAssert retries the block until it doesn't throw (or timeout)

/Patrik
fre 31 mars 2017 kl. 15:50 skrev Kilic Ali-Firat <kilic.alifi...@gmail.com>:

> Hi Akka Team,
>
> It's again me about the multi-nodes test that I'm doing to validate my
> cluster. I solve my issues about Master-Worker communication but I have a
> really weird behavior in my test. The below piece of code seems to be
> execute 3 three times :
>
>
> awaitAssert {
>
>
>           /**
>             * Send the job to the master
>             */
>           master ! jobDump
>
>
>           /**
>             * Worker must return a JobSuccess
>             */
>           checkAssertionsOnRecord(deviceId)
>
>
> }
>
> Just to be quick, I have two Actors (Master and Worker) with the following
> implements :
>
>
> case class Master(totalInstance : Int,routees : scala.collection.immutable
> .Seq[String],nodeRole : Option[String]) extends Actor with ActorLogging {
>
>
>   val routerType = RoundRobinGroup(Nil)
>   val clusterGroupSettings = ClusterRouterGroupSettings(totalInstances =
> totalInstance,routeesPaths = routees,allowLocalRoutees = true,useRole =
> nodeRole)
>   val clusterGroup = ClusterRouterGroup(routerType,clusterGroupSettings).
> props()
>   val workerRouter = context.actorOf(clusterGroup, name="workerRouter")
>
>
>   def receive = {
>
>
>     case jDump : JobDump =>
>       log.debug("Master {} receive from {} following job: {}", self.path,
> sender(), jDump)
>       workerRouter ! jDump
>  }
> }
>
>
> case class Worker(awsBucket : String,gapMinValueMicroSec : Long,
> persistentCache: PersistentCache[DeviceId, BCPPackets],referentialService:
> IReferentialService,bffIO : BFF_IO) extends Actor with ActorLogging {
>
>
>
>
>   def receive = {
>     case jDump: JobDump =>
>       val replyTo = sender()
>       log.info("Worker {} receive following job: {}", self.path, jDump)
>       val jobDumpResult = this.executeJobDump(jDump)
>       this.sendJobResultToSender(jDump, jobDumpResult, replyTo)
>  }
> }
>
>
> To make easier the debug, I limit the test to one node (this is why I set
> to to true allowLocalRoutees for the Master) like this :
>
>
> object FGSClusterSpecConfig extends MultiNodeConfig with LazyLogging {
>
>
>   // register the named roles (nodes) of the test
>   val first = role("first")
>   def nodeList = Seq(first)
>   val portOffset = 2551
>   val numberOfPorts = nodeList.size
>   val portList = (portOffset to (portOffset+numberOfPorts)).toList
>
>
>     nodeList
>       .zip(portList)
>       .foreach { case (role, port) =>
>       nodeConfig(role) {
>         ConfigFactory.parseString(s"""
>       # Define port for each node configuration
>       akka.remote.netty.tcp.port = $port
>       # Disable legacy metrics in akka-cluster.
>       akka.cluster.metrics.enabled=off
>       # Enable metrics extension in akka-cluster-metrics.
>       akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
>       # Sigar native library extract location during tests.
>       akka.cluster.metrics.native-library-extract-folder=target/native/${
> role.name}
>           """)
>       }
>     }
>   commonConfig(ConfigFactory.parseString(
>     """
>        akka.actor.provider = cluster
>        akka.loglevel=DEBUG
>     """.stripMargin))
> }
>
> class FGSSpecMultiJvmNode1 extends FGSClusterSpec
>
> abstract class FGSClusterSpec extends MultiNodeSpec(FGSClusterSpecConfig)
> with WordSpecLikewith Matcherswith BeforeAndAfterAllwith MockitoSugarwith
> ImplicitSenderwith LazyLogging {
>
>   import FGSClusterSpecConfig._
>
>   override def initialParticipants: Int = roles.size
>   override def beforeAll() = multiNodeSpecBeforeAll()
>   override def afterAll() = multiNodeSpecAfterAll()
>
>   val mockReferential = mock[IReferentialService]
>
>   val workerProps =
>     Worker.props(awsBucket = Val.AWSBUCKET,
>       gapMinValueMicroSec = Val.dummyGapMinValueMicroSec,
>       persistentCache = Val.mockPersistentCache,
>       referentialService = mockReferential,
>       bffIO = Val.bffIO
>     )
>
>   "A FGS cluster" must {
>     "illustrate how to startup cluster" in within(15 seconds) {
>       Cluster(system).subscribe(testActor, classOf[MemberUp])
>       expectMsgClass(classOf[CurrentClusterState])
>
>       val firstAddress = node(first).address
>       Cluster(system) join firstAddress
>
>       val workerActor = system.actorOf(workerProps, name = "worker")
>       val routees = scala.collection.immutable.Seq("/user/worker")
>       system.actorOf(Master.props(1,routees, None), name = "master")
>
>        receiveN(roles.size)
>         .collect { case MemberUp(m) => m.address }
>         .toSet should be(Set(firstAddress))
>
>
>       Cluster(system).unsubscribe(testActor)
>       testConductor.enter("all-up")
>     }
>
>
>     "execute a job dump from one node" in within(15.seconds) {
>       log.info(
>         """
>           |
> ********************************************************************
>           | BEGIN TEST
>           |
> ********************************************************************
>         """.stripMargin)
>
>
>       runOn(first) {
>         val master = system.actorSelection(node(first) / "user" / "master"
> )
>
>
>         /**x
>           * Job Dump settings
>           */
>         val deviceId = getClass.getSimpleName
>         val from = Some(0L)
>         val to = Some(10000000L)
>         val jobDump = JobDump(deviceId, from, to)
>
>
>         /**
>           * Mock the job dump receive by the master and forward to a worker
>           */
>         val jobRemove = mockJobDump(jobDump)
>
>
>         mockRemoveData(jobRemove)
>
>
>         awaitAssert {
>
>
>           /**
>             * Send the job to the master
>             */
>           master ! jobDump
>
>
>           /**
>             * Worker must return a JobSuccess
>             */
>           checkAssertionsOnRecord(deviceId)
>         }
>       }
>       testConductor.enter("done-2")
>     }
>   }
>
>
> And now in the logs, I see this :
>
> [JVM-1] [DEBUG] [03/31/2017 15:36:37.888] [New I/O worker #10]
> [akka.remote.testconductor.PlayerHandler(akka://FGSClusterSpec)] channel
> [id: 0x1df9d48f, /127.0.0.1:54134 => /127.0.0.1:4711] written 13
> [JVM-1] [DEBUG] [03/31/2017 15:36:37.889] [New I/O worker #7]
> [akka.remote.testconductor.ConductorHandler(akka://FGSClusterSpec)] message
> from /127.0.0.1:54134: GetAddress(RoleName(first))
> [JVM-1] [DEBUG] [03/31/2017 15:36:37.889] [New I/O worker #10]
> [akka.remote.testconductor.PlayerHandler(akka://FGSClusterSpec)] message
> from /127.0.0.1:4711:
> AddressReply(RoleName(first),akka.tcp://FGSClusterSpec@localhost:2551)
> [JVM-1] [DEBUG] [03/31/2017 15:36:37.941] [FGSClusterSpec-akka.actor.
> default-dispatcher-15] [akka.tcp://FGSClusterSpec@localhost:2551/user/master]
> Master akka://FGSClusterSpec/user/master receive from
> Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job:
> JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
> [JVM-1] [INFO] [03/31/2017 15:36:37.942] [FGSClusterSpec-akka.actor.
> default-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker]
> Worker akka://FGSClusterSpec/user/worker receive following job:
> JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
> [JVM-1] [DEBUG] [03/31/2017 15:36:38.608] [FGSClusterSpec-akka.actor.
> default-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/master]
> Master akka://FGSClusterSpec/user/master receive from
> Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job:
> JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
> [JVM-1] [INFO] [03/31/2017 15:36:38.609] [FGSClusterSpec-akka.actor.
> default-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker]
> Worker akka://FGSClusterSpec/user/worker receive following job:
> JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
> [JVM-1] [DEBUG] [03/31/2017 15:36:38.806] [FGSClusterSpec-akka.actor.
> default-dispatcher-19] [akka.tcp://FGSClusterSpec@localhost:2551/user/master]
> Master akka://FGSClusterSpec/user/master receive from
> Actor[akka://FGSClusterSpec/system/testActor-1#1142810589] following job:
> JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
> [JVM-1] [INFO] [03/31/2017 15:36:38.807] [FGSClusterSpec-akka.actor.
> default-dispatcher-2] [akka.tcp://FGSClusterSpec@localhost:2551/user/worker]
> Worker akka://FGSClusterSpec/user/worker receive following job:
> JobDump(FGSSpecMultiJvmNode1,Some(0),Some(10000000))
>
> The master receive *the same message three times* and I really cannot
> understand why. I limit the tests to one node so I'm expecting to get only
> one job to execute, not three like now.  Is it the way to write my test
> which is wrong or something else ?
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to