Hi everyone,

I have a scenario where I use Cluster Sharding as follows. 

Each Node/Machine:

   1. Cluster Sharding enabled ActorSystem1 running on port XXXX behind 
   Docker Container
   2. There is only one ShardType in this ActorSystem1 and 10 Shards
   3. The Entity actors are persistent using Cassandra
   
The message flow looks like this

   - Message1 to ShardRegion
   - Message1 reaches PersistentEntityActor
   - PersistentEntityActor asks for something from a RemoteActor( in a 
   different actor system with different port in the same machine). 
   PersistentEntityActor knows about the RemoteActorReferencce through 
   constructor - please see code below
   - RemoteActor forwards the message to a child worker.
   - Child worker need to respond back to the PersistentEntityActor who 
   asked for something.
   

The flow works fine when the Cluster Sharding and RemoteActor are running 
in their respective actorsystems with different ports.

But when I create the RemoteActor in the same actor system as 
ClusterSharding as a normal actor. The child worker is not sending response 
back to the PersistentEntityActor. 

class PersistentEntityActor(remoteActorRef: ActorRef) extends Actor {
  override def receive: Receive = {
    case Cmd(data) => {    
      implicit val timeout = Timeout.apply(6 seconds)      
      val response = remoteRouterRef ? "Hey there"
      response map (resp =>
        resp match {
          case ResultA =>
            println(s"The response from remote actor for ask is : ResultA")
          case ResultB =>
            println(s"The response from remote actor for ask is : ResultB")
          }
        )
    }
  }
}

object PersistentEntityActor {
  case class Cmd(data: Int)

  // I have skipped events to keep the code simple

  val extractShardId: ExtractShardId = {
    case Cmd(data) => (data % 2).toString
  }

  val extractEntityId: ExtractEntityId = {
    case cmd: Cmd => (cmd.data.toString, cmd)
  }

  case class ResultA()

  case class ResultB()
}

class RemoteActor extends Actor{
  override def receive: Receive = {
    case anyMsg =>
      val child = context.actorOf(Props[Worker])
      child.forward(anyMsg)
  }
}

class Worker extends Actor{
  override def receive: Receive = {
    case msg =>      
      sender() ! ResultA
  }
}

object RunWithSeparateActorSystem extends App {

  // create remote actor

  val remoteConfig = 
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + 5150).
    withFallback(ConfigFactory.load("remoting_conf"))
  val remoteEnabledActorSystem = ActorSystem("RemoteActorSystem", 
remoteConfig)
  val remoteActor = remoteEnabledActorSystem.actorOf(Props[RemoteActor], 
"RemoteRouter")

  // cluster sharding
  val clusterConfig = 
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + 2551).
    withFallback(ConfigFactory.load("sharding_cassandra_conf"))
  val system = ActorSystem("ClusterSystem", clusterConfig)

  val shardRegionRef =
    ClusterSharding(system).start(typeName = "ShardActorToCallRemote",
      entityProps = Props(new ShardActorToCallRemote(remoteActor)),
      settings = ClusterShardingSettings(system),
      extractShardId = ShardActorToCallRemote.extractShardId,
      extractEntityId = ShardActorToCallRemote.extractEntityId)

  // give some time for the shard setup to come up
  Thread.sleep(4000)

  shardRegionRef ! Cmd(10090)
}


object RunWithSameActorSystem extends App {

 // cluster sharding
  val clusterConfig = 
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + 2551).
    withFallback(ConfigFactory.load("sharding_cassandra_conf"))
  val system = ActorSystem("ClusterSystem", clusterConfig)

  // create remote actor
  val remoteActor = system.actorOf(Props[RemoteActor], "RemoteRouter")

  val shardRegionRef =
    ClusterSharding(system).start(typeName = "ShardActorToCallRemote",
      entityProps = Props(new ShardActorToCallRemote(remoteActor)),
      settings = ClusterShardingSettings(system),
      extractShardId = ShardActorToCallRemote.extractShardId,
      extractEntityId = ShardActorToCallRemote.extractEntityId)

  // give some time for the shard setup to come up
  Thread.sleep(4000)

  shardRegionRef ! Cmd(10090)
}

sharding_cassandra_conf.conf

akka {
  loglevel = INFO

  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = ["akka.tcp://ClusterSystem@127.0.0.1:2551"]
    auto-down-unreachable-after = 10s
    metrics.enabled = off

    sharding {
      remember-entities = on
      journal-plugin-id = "cassandra-journal"
      snapshot-plugin-id = "cassandra-snapshot-store"
      rebalance-interval = 10 s
    }
  }
  
  persistence {
    journal.plugin = cassandra-journal
    snapshot-store.plugin = cassandra-snapshot-store
  }
}


remoting_conf.conf

akka {
  loglevel = "INFO"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

Thanks.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to