Hi,
I am trying to create a cluster, that uses a consistent hashing router,
running in a docker container. I am unable to get this to behave
consistently.
Please see my config and code attached.
My errors vary from
1. Slave actor systems not joining and timing out (i have increased timers
to adjust)
2. one of the routee workers consistently not being able to send msgs to
the router, but other routes on the same host are able to
I have the two application.conf files attached - one for the master, and
the other for the slave.
Since this is a cluster singleton, running in docker, i am passing
variables in via shell variables and initializing some parts of the cluster
in code.
Please see my code attached. Would appreciate any guidance in how to debug.
I suspect, and tried to address the following
1. cluster timers because this is a blocking compute job
- i have created specialized executors with dedicated threads to
address this, I have also just disabled the blocking call to test whats
going on
- tried to create specialized dispatchers for the cluster
One of the error messages on the slave that has the failing routee that i
am unable to understand (and could be contributing is)
56 05:42:04.434UTC DEBUG ystem/system/cluster/core/daemon: now supervising
Actor[akka://system/system/cluster/core/daemon/joinSeedNodeProcess-1#882767345]
119 05:43:02.560UTC DEBUG lActorRefProvider(akka://system): resolve of path
sequence [/system/cluster/core/daemon/joinSeedNodeProcess-1#882767345]
failed
Master and slave init code is below -
Master Init ----
// validate and setup config file
val portNumber = ConfigFactory.load().getInt(
"Master.akka.remote.artery.canonical.port")
val (hostName,bindHostname) =
if(sys.env.get("HOST_IP_ADDR").isDefined)
(sys.env.get("HOST_IP_ADDR"
).get,InetAddress.getLocalHost.getHostAddress)
else
(InetAddress.getLocalHost.getHostAddress,InetAddress.getLocalHost.getHostAddress)
val myConfig = ConfigFactory.parseString(
String.format("""
Master.akka.remote.artery.canonical.hostname = "%s"
Master.akka.remote.artery.bind.hostname = "%s"
Master.akka.remote.artery.bind.port = 25521
Master.akka.cluster.seed-nodes = ["akka://system@%s:%s"]
""".stripMargin, hostName, bindHostname, hostName, portNumber
.toString()))
//
http://doc.akka.io/docs/akka/current/general/configuration.html#Reading_configuration_from_a_custom_location
val config = ConfigFactory.load(myConfig
.withFallback(ConfigFactory.load()))
if (config.getStringList("akka.cluster.seed-nodes").size() > 1) {
log.error("too many seed nodes in config file")
System.exit(1)
}
println("Cosbench_ng master is UP at: akka://system@%s:%d".format(
hostName,portNumber))
println("Status in log directory: /tmp/cosbench_ng")
implicit val asystem = ActorSystem("system",config.getConfig("Master"
).withFallback(config))
implicit val timeout = Timeout(5.seconds)
val cluster = Cluster.get(asystem)
val flowControlActor = asystem.actorOf(FlowControlActor.props)
// create the flow that does the work
// global singleton actor that routes messages to multiple workers
// and collates results into a statistics queue
asystem.actorOf(ClusterSingletonManager.props(
singletonProps = MyRouter.props(),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(asystem).withRole("router"
)),
name = "myRouter")
Slave init code --
val (hostName,bindHostname) =
if(sys.env.get("HOST_IP_ADDR").isDefined)
(sys.env.get("HOST_IP_ADDR"
).get,InetAddress.getLocalHost.getHostAddress)
else
(InetAddress.getLocalHost.getHostAddress,InetAddress.getLocalHost.getHostAddress)
// setup configuration
val myConfig = ConfigFactory.parseString(
String.format("""
Slave.akka.remote.artery.canonical.hostname = "%s"
Slave.akka.remote.artery.bind.hostname = "%s"
Slave.akka.cluster.seed-nodes = ["%s"]
""".stripMargin, hostName, bindHostname, cmd.get.master.get))
println("Connecting to master: " + cmd.get.master.get)
//
http://doc.akka.io/docs/akka/current/general/configuration.html#Reading_configuration_from_a_custom_location
val config = ConfigFactory.load( myConfig.withFallback(
ConfigFactory.load() ) )
// actors in this actor system are created remotely
implicit val system = ActorSystem("system",config.getConfig("Slave"
).withFallback(config))
val cluster = Cluster.get(system)
println("Slave is up at: " + cluster.selfUniqueAddress.address)
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
# SERVER
# ~~~~~
# Akka logging
Master {
include "common.conf"
akka {
# Logging configuration
#log-config-on-start = on
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
log-dead-letters = 10
log-dead-letters-during-shutdown = on
actor {
dispatcher = cluster-dispatcher
provider = cluster
# required only during testing
# see
http://doc.akka.io/docs/akka/2.4/scala/serialization.html#serialization-scala
serialize-messages = on
serialize-creators = on
allow-java-serialization = on
warn-about-java-serializer-usage = off
debug {
receive = on
autoreceive = on
lifecycle = on
fsm = off
event-stream = off
unhandled = on
router-misconfiguration = on
}
serializers {
proto = "akka.remote.serialization.ProtobufSerializer"
}
# see
http://doc.akka.io/docs/akka/2.4/scala/remoting-artery.html#Remoting_S
deployment {
/myRouter/singleton/workerRouter {
router = consistent-hashing-pool
cluster {
enabled = on
max-nr-of-instances-per-node = 2
allow-local-routees = on
}
}
}
}
remote {
untrusted-mode = off
watch-failure-detector.threshold = 100
artery {
enabled = on
canonical.port = 25521
}
}
cluster {
metrics.enabled = off
use-dispatcher = Master.akka.cluster-dispatcher
failure-detector {
threshold = 100
acceptable-heartbeat-pause = 60s
heartbeat-interval = 10s
heartbeat-request {
expected-response-after = 20s
}
}
roles =["router"]
}
cluster-dispatcher {
type = "PinnedDispatcher"
executor = "thread-pool-executor"
}
}
}
# REMOTE
# Akka logging
Slave {
include file("common.conf")
akka {
# Logging configuration
#log-config-on-start = on
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
log-dead-letters = 10
log-dead-letters-during-shutdown = on
#stream.materializer.initial-input-buffer-size = 1
#stream.materializer.max-input-buffer-size = 1
actor {
provider = cluster
# required only during testing
# see
http://doc.akka.io/docs/akka/2.4/scala/serialization.html#serialization-scala
serialize-messages = on
serialize-creators = on
warn-about-java-serializer-usage = off
debug {
receive = on
autoreceive = on
lifecycle = on
fsm = off
event-stream = off
unhandled = on
router-misconfiguration = on
}
deployment {
/myRouter/singleton/workerRouter {
router = consistent-hashing-pool
cluster {
enabled = on
max-nr-of-instances-per-node = 2
allow-local-routees = on
}
}
}
serializers {
proto = "akka.remote.serialization.ProtobufSerializer"
}
}
remote {
# trusted mode
untrusted-mode = off
watch-failure-detector.threshold = 100
artery {
enabled = on
canonical.port = 23678
bind.port = 23678
}
}
cluster {
metrics.enabled = off
use-dispatcher = Slave.akka.cluster-dispatcher
failure-detector {
threshold = 100
acceptable-heartbeat-pause = 60s
heartbeat-interval = 10s
heartbeat-request {
expected-response-after = 20s
}
}
roles =["slave"]
}
cluster-dispatcher {
type = "PinnedDispatcher"
executor = "thread-pool-executor"
}
}
}