Hi,

I am trying to create a cluster, that uses a consistent hashing router, 
running in a docker container. I am unable to get this to behave 
consistently.

Please see my config and code attached. 

My errors vary from
1. Slave actor systems not joining and timing out (i have increased timers 
to adjust)
2. one of the routee workers consistently not being able to send msgs to 
the router, but other routes on the same host are able to 

I have the two application.conf files attached - one for the master, and 
the other for the slave.

Since this is a cluster singleton, running in docker, i am passing 
variables in via shell variables and initializing some parts of the cluster 
in code. 

Please see my code attached. Would appreciate any guidance in how to debug.

I suspect, and tried to address the following

1. cluster timers because this is a blocking compute job 
       - i have created specialized executors with dedicated threads to 
address this, I have also just disabled the blocking call to test whats 
going on
       - tried to create specialized dispatchers for the cluster

One of the error messages on the slave that has the failing routee that i 
am unable to understand (and could be contributing is)

 56 05:42:04.434UTC DEBUG ystem/system/cluster/core/daemon: now supervising 
Actor[akka://system/system/cluster/core/daemon/joinSeedNodeProcess-1#882767345]

119 05:43:02.560UTC DEBUG lActorRefProvider(akka://system): resolve of path 
sequence [/system/cluster/core/daemon/joinSeedNodeProcess-1#882767345] 
failed



Master and slave init code is below - 


Master Init ----

  // validate and setup config file        

    val portNumber = ConfigFactory.load().getInt(
"Master.akka.remote.artery.canonical.port")

    val (hostName,bindHostname) = 

      if(sys.env.get("HOST_IP_ADDR").isDefined) 

        (sys.env.get("HOST_IP_ADDR"
).get,InetAddress.getLocalHost.getHostAddress)

      else

        
(InetAddress.getLocalHost.getHostAddress,InetAddress.getLocalHost.getHostAddress)

        

    val myConfig = ConfigFactory.parseString(

      String.format("""

        Master.akka.remote.artery.canonical.hostname = "%s"

        Master.akka.remote.artery.bind.hostname = "%s"

        Master.akka.remote.artery.bind.port = 25521

        Master.akka.cluster.seed-nodes = ["akka://system@%s:%s"]

        """.stripMargin, hostName, bindHostname, hostName, portNumber
.toString()))

        

    // 
http://doc.akka.io/docs/akka/current/general/configuration.html#Reading_configuration_from_a_custom_location

    val config = ConfigFactory.load(myConfig
.withFallback(ConfigFactory.load()))

        

    if (config.getStringList("akka.cluster.seed-nodes").size() > 1) {

      log.error("too many seed nodes in config file")

      System.exit(1)

    }

    

    println("Cosbench_ng master is UP at: akka://system@%s:%d".format(
hostName,portNumber))

    println("Status in log directory: /tmp/cosbench_ng")

        

    implicit val asystem = ActorSystem("system",config.getConfig("Master"
).withFallback(config))

    implicit val timeout = Timeout(5.seconds)

      

    val cluster = Cluster.get(asystem)

    val flowControlActor = asystem.actorOf(FlowControlActor.props)

    

    

    // create the flow that does the work

    // global singleton actor that routes messages to multiple workers

    // and collates results into a statistics queue      


    asystem.actorOf(ClusterSingletonManager.props(

      singletonProps = MyRouter.props(),

      terminationMessage = PoisonPill,

      settings = ClusterSingletonManagerSettings(asystem).withRole("router"
)),

      name = "myRouter")

    



Slave init code --



 

  val (hostName,bindHostname) = 

     if(sys.env.get("HOST_IP_ADDR").isDefined) 

        (sys.env.get("HOST_IP_ADDR"
).get,InetAddress.getLocalHost.getHostAddress)

     else

        
(InetAddress.getLocalHost.getHostAddress,InetAddress.getLocalHost.getHostAddress)

    

  // setup configuration

  val myConfig = ConfigFactory.parseString(

      String.format("""

        Slave.akka.remote.artery.canonical.hostname = "%s"

        Slave.akka.remote.artery.bind.hostname = "%s"

        Slave.akka.cluster.seed-nodes = ["%s"]

        """.stripMargin, hostName, bindHostname, cmd.get.master.get))

        

  println("Connecting to master: " + cmd.get.master.get)



  // 
http://doc.akka.io/docs/akka/current/general/configuration.html#Reading_configuration_from_a_custom_location

  val config = ConfigFactory.load( myConfig.withFallback( 
ConfigFactory.load() ) )


  // actors in this actor system are created remotely

  implicit val system = ActorSystem("system",config.getConfig("Slave"
).withFallback(config))

  

  val cluster = Cluster.get(system)  

  println("Slave is up at: " + cluster.selfUniqueAddress.address)





-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
# SERVER
# ~~~~~



# Akka logging
Master {

include "common.conf"


akka {

        # Logging configuration
        #log-config-on-start = on
        loggers = ["akka.event.slf4j.Slf4jLogger"]
        loglevel = "DEBUG"
        logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
        
        log-dead-letters = 10
        log-dead-letters-during-shutdown = on



        actor {
          dispatcher = cluster-dispatcher
          provider = cluster
          
          # required only during testing
          # see 
http://doc.akka.io/docs/akka/2.4/scala/serialization.html#serialization-scala
          serialize-messages = on
          serialize-creators = on
          allow-java-serialization = on
          warn-about-java-serializer-usage = off
          
          debug {
            receive = on
            autoreceive = on
            lifecycle = on
            fsm = off
            event-stream = off
            unhandled = on
            router-misconfiguration = on
          }
          

          serializers {
          proto = "akka.remote.serialization.ProtobufSerializer"
      }
                  
          # see 
http://doc.akka.io/docs/akka/2.4/scala/remoting-artery.html#Remoting_S
          deployment {
                /myRouter/singleton/workerRouter {
                        router = consistent-hashing-pool
                        cluster {
                                enabled = on
                                max-nr-of-instances-per-node = 2
                                allow-local-routees = on                        
        
                        }                               
                }                 
          }
    }
        remote {
        
          untrusted-mode = off
          watch-failure-detector.threshold = 100
                  
          artery {
            enabled = on
            canonical.port = 25521
          }
        }
        
        cluster {
          metrics.enabled = off
          use-dispatcher = Master.akka.cluster-dispatcher
          
                failure-detector {
                        threshold = 100
                        acceptable-heartbeat-pause = 60s
                        heartbeat-interval = 10s
                        heartbeat-request {
                                expected-response-after = 20s
                        }                               
                } 
          
          roles =["router"]
        }       
        
        cluster-dispatcher {
                type = "PinnedDispatcher"
                executor = "thread-pool-executor"
        }
                
}
}
# REMOTE



# Akka logging
Slave {
include file("common.conf")

akka {

        # Logging configuration
        #log-config-on-start = on
        loggers = ["akka.event.slf4j.Slf4jLogger"]
        loglevel = "DEBUG"
        logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
        
        log-dead-letters = 10
        log-dead-letters-during-shutdown = on


        #stream.materializer.initial-input-buffer-size = 1
    #stream.materializer.max-input-buffer-size = 1
    
        actor {
          provider = cluster
          
          # required only during testing
          # see 
http://doc.akka.io/docs/akka/2.4/scala/serialization.html#serialization-scala
          serialize-messages = on
          serialize-creators = on

          warn-about-java-serializer-usage = off
          
          debug {
            receive = on
            autoreceive = on
            lifecycle = on
            fsm = off
            event-stream = off
            unhandled = on
            router-misconfiguration = on
          }

          deployment {
                /myRouter/singleton/workerRouter {
                        router = consistent-hashing-pool
                        cluster {
                                enabled = on
                                max-nr-of-instances-per-node = 2
                                allow-local-routees = on                        
        
                        }                               
                }                 
          }
          
          

          serializers {
          proto = "akka.remote.serialization.ProtobufSerializer"
      }  

      
        }


        remote {
         
         # trusted mode
         untrusted-mode = off
     watch-failure-detector.threshold = 100
        
          artery {
            enabled = on
            canonical.port = 23678 
            bind.port = 23678 
          }
        }


        cluster {
          metrics.enabled = off
          use-dispatcher = Slave.akka.cluster-dispatcher
                failure-detector {
                        threshold = 100
                        acceptable-heartbeat-pause = 60s
                        heartbeat-interval = 10s
                        heartbeat-request {
                                expected-response-after = 20s
                        }                               
                } 
          
          
          roles =["slave"]
        }
        
        cluster-dispatcher {
                type = "PinnedDispatcher"
                executor = "thread-pool-executor"
        }
                                                
}
}

Reply via email to