i am doing cluster example from site : https://github.com/akka/akka-samples/tree/2.5/akka-sample-cluster-java while i am executing the cluster it shows following error can anyone solve this error.
Error is like this //=========================Main 1================================= //=========================Main 2================================= [DEBUG] [07/24/2017 15:49:45.072] [main] [EventStream(akka://ClusterSystem)] logger log1-Logging$DefaultLogger started [DEBUG] [07/24/2017 15:49:45.088] [main] [EventStream(akka://ClusterSystem)] Default Loggers started [INFO] [07/24/2017 15:49:45.727] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/24/2017 15:49:46.163] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:1170] [INFO] [07/24/2017 15:49:46.195] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:1170] [INFO] [07/24/2017 15:49:46.210] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:1170] - Starting up... [INFO] [07/24/2017 15:49:46.475] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:1170] - Registered cluster JMX MBean [akka:type=Cluster] [INFO] [07/24/2017 15:49:46.475] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:1170] - Started up successfully //=========================Main 3================================= [DEBUG] [07/24/2017 15:49:46.600] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster-metrics] Collection started. //=========================Main 4================================= //========================= CLUSTER INVOKE PRE START METHOD ================================= //========================= CLUSTER IS STARTED ================================= [DEBUG] [07/24/2017 15:49:46.615] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.metrics.MetricsCollector$(akka://ClusterSystem)] Trying akka.cluster.metrics.SigarMetricsCollector. [DEBUG] [07/24/2017 15:49:46.615] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.metrics.MetricsCollector$(akka://ClusterSystem)] Trying akka.cluster.metrics.JmxMetricsCollector. [INFO] [07/24/2017 15:49:46.631] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:1170] - Metrics collection has started successfully [WARN] [07/24/2017 15:49:46.662] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster/core/daemon/downingProvider] Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation. //========================= GOING INTO CURRENT CLUSTER STATE ================================= //========================= CURRENT CLUSTER STATE SUCCESS =================================CurrentClusterState(TreeSet(),Set(),Set(),None,Map()) [WARN] [07/24/2017 15:49:56.746] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster/core/daemon/joinSeedNodeProcess-1] Couldn't join seed nodes after [2] attempts, will try again. seed-nodes=[akka.tcp://ClusterSystem@127.0.0.2:1170, akka.tcp://ClusterSystem@127.0.0.3:1170] [WARN] [07/24/2017 15:50:01.765] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster/core/daemon/joinSeedNodeProcess-1] Couldn't join seed nodes after [3] attempts, will try again. seed-nodes=[akka.tcp://ClusterSystem@127.0.0.2:1170, akka.tcp://ClusterSystem@127.0.0.3:1170] [INFO] [07/24/2017 15:50:01.812] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1] No response from remote for outbound association. Associate timed out after [15000 ms]. [INFO] [07/24/2017 15:50:01.827] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2] No response from remote for outbound association. Associate timed out after [15000 ms]. [WARN] [07/24/2017 15:50:01.905] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /127.0.0.3:1170 [WARN] [07/24/2017 15:50:01.905] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /127.0.0.2:1170 [DEBUG] [07/24/2017 15:50:01.921] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1/endpointWriter] AssociationError [akka.tcp://ClusterSystem@127.0.0.1:1170] -> [akka.tcp://ClusterSystem@127.0.0.3:1170]: Error [Association failed with [akka.tcp://ClusterSystem@127.0.0.3:1170]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://ClusterSystem@127.0.0.3:1170] Caused by: java.util.concurrent.TimeoutException: No response from remote for outbound association. Associate timed out after [15000 ms]. at akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:367) at akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:341) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at akka.actor.FSM$class.processEvent(FSM.scala:663) at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:629) at akka.actor.Actor$class.aroundReceive(Actor.scala:513) at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519) at akka.actor.ActorCell.invoke(ActorCell.scala:488) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ] [DEBUG] [07/24/2017 15:50:01.921] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter] AssociationError [akka.tcp://ClusterSystem@127.0.0.1:1170] -> [akka.tcp://ClusterSystem@127.0.0.2:1170]: Error [Association failed with [akka.tcp://ClusterSystem@127.0.0.2:1170]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://ClusterSystem@127.0.0.2:1170] Caused by: java.util.concurrent.TimeoutException: No response from remote for outbound association. Associate timed out after [15000 ms]. at akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:367) at akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:341) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at akka.actor.FSM$class.processEvent(FSM.scala:663) at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:629) at akka.actor.Actor$class.aroundReceive(Actor.scala:513) at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519) at akka.actor.ActorCell.invoke(ActorCell.scala:488) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ] [WARN] [07/24/2017 15:50:01.921] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1] Association with remote system [akka.tcp://ClusterSystem@127.0.0.3:1170] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.3:1170]] Caused by: [No response from remote for outbound association. Associate timed out after [15000 ms].] [INFO] [07/24/2017 15:50:01.921] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1] Message [akka.actor.Status$Failure] from Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1#-1283183146] to Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1#-1283183146] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [WARN] [07/24/2017 15:50:01.921] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.2:1170] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.2:1170]] Caused by: [No response from remote for outbound association. Associate timed out after [15000 ms].] [INFO] [07/24/2017 15:50:01.921] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2] Message [akka.actor.Status$Failure] from Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2#-987867521] to Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2#-987867521] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [07/24/2017 15:50:01.968] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [07/24/2017 15:50:01.968] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [07/24/2017 15:50:01.968] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [07/24/2017 15:50:01.968] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [DEBUG] [07/24/2017 15:50:01.968] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter] Disassociated [akka.tcp://ClusterSystem@127.0.0.1:1170] -> [akka.tcp://ClusterSystem@127.0.0.2:1170] [INFO] [07/24/2017 15:50:01.968] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter#1576329271] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter#1576329271] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [DEBUG] [07/24/2017 15:50:01.983] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1/endpointWriter] Disassociated [akka.tcp://ClusterSystem@127.0.0.1:1170] -> [akka.tcp://ClusterSystem@127.0.0.3:1170] [INFO] [07/24/2017 15:50:01.983] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [07/24/2017 15:50:01.983] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [07/24/2017 15:50:01.983] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. and tell me how to execute node i am confusing while executing nodes. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
package sample.cluster.transformation; import akka.actor.UntypedActor; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberUp; import akka.cluster.Member; import akka.cluster.MemberStatus; //import akka.cluster.metrics.ClusterMetricsExtension; //import sample.cluster.transformation.TransformationMessages.TransformationJob; //import sample.cluster.transformation.TransformationMessages.TransformationResult; //import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION; //#backend @SuppressWarnings("deprecation") public class TransformationBackend extends UntypedActor implements TransformationMessages{ Cluster cluster = Cluster.get(getContext().system()); //subscribe to cluster changes, MemberUp @Override public void preStart() { System.out.println("//========================= CLUSTER INVOKE PRE START METHOD ================================="); cluster.subscribe(getSelf(), MemberUp.class); System.out.println("//========================= CLUSTER IS STARTED ================================="); } //re-subscribe when restart @Override public void postStop() { cluster.unsubscribe(getSelf()); System.out.println("//========================= CLUSTER STOPED.....! ================================="); } @Override public void onReceive(Object message) { if (message instanceof TransformationJob) { System.out.println("//========================= GOING INTO TRANSFORMATION JOB ================================="); TransformationJob job = (TransformationJob) message; getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf()); System.out.println("//========================= TRANSFORMATION JOB SUCCESS.... ================================="); } else if (message instanceof CurrentClusterState) { System.out.println("//========================= GOING INTO CURRENT CLUSTER STATE ================================="); CurrentClusterState state = (CurrentClusterState) message; System.out.println("//========================= CURRENT CLUSTER STATE SUCCESS ================================="+state); for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { System.out.println("//========================= MEMBER IS UP ================================="); register(member); System.out.println("//========================= MEMBERS ARE UP ================================="+member); } } } else if (message instanceof MemberUp) { System.out.println("//========================= MEMBERS..... ================================="); MemberUp mUp = (MemberUp) message; register(mUp.member()); System.out.println("//========================= MEMBERS UP REGISTER ================================="); } else { unhandled(message); System.out.println("//========================= ERROR IN MEMBERS ================================="); } } void register(Member member) { System.out.println("//========================= REGISTER MEMBER ================================="); if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); } }//#backend
package sample.cluster.transformation; import akka.actor.ActorSystem; import akka.actor.Props; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class TransformationBackendMain { public static void main(String[] args) { // Override the configuration of the port when specified as program argument System.out.println("//=========================Main 1================================="); final String hostname = args.length > 0 ? args[0] : "127.0.0.1"; final String port = args.length > 1 ? args[1] : "1170"; final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname). withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)). withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")). withFallback(ConfigFactory.load()); System.out.println("//=========================Main 2================================="); ActorSystem system = ActorSystem.create("ClusterSystem", config); System.out.println("//=========================Main 3================================="); system.actorOf(Props.create(TransformationBackend.class), "backend"); System.out.println("//=========================Main 4================================="); }}
package sample.cluster.transformation; import akka.actor.ActorRef; import akka.actor.Terminated; import akka.actor.UntypedActor; import sample.cluster.transformation.TransformationMessages.JobFailed; import sample.cluster.transformation.TransformationMessages.TransformationJob; import java.util.ArrayList; import java.util.List; import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION; //#frontend public class TransformationFrontend extends UntypedActor { List<ActorRef> backends = new ArrayList<ActorRef>(); int jobCounter = 0; @Override public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } } } //#frontend
package sample.cluster.transformation; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.OnSuccess; import akka.pattern.Patterns; import akka.util.Timeout; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import sample.cluster.transformation.TransformationMessages.TransformationJob; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class TransformationFrontendMain { public static void main(String[] args) { // Override the configuration of the port when specified as program argument final String hostname = args.length > 0 ? args[0] : "127.0.0.1"; final String port = args.length > 1 ? args[1] : "1170"; final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname). withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)). withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")). withFallback(ConfigFactory.load()); ActorSystem system = ActorSystem.create("ClusterSystem", config); final ActorRef frontend = system.actorOf( Props.create(TransformationFrontend.class), "frontend"); final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS); final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); final ExecutionContext ec = system.dispatcher(); final AtomicInteger counter = new AtomicInteger(); system.scheduler().schedule(interval, interval, new Runnable() { public void run() { Patterns.ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet()), timeout).onSuccess(new OnSuccess<Object>() { public void onSuccess(Object result) { System.out.println(result); } }, ec); } }, ec); } }
package sample.cluster.transformation; import java.io.Serializable; //#messages public interface TransformationMessages { public static class TransformationJob implements Serializable { private static final long serialVersionUID = 1L; private final String text; public TransformationJob(String text) { this.text = text; } public String getText() { return text; } @Override public String toString() { return "TransformationJob(" + text + ")"; } } public static class TransformationResult implements Serializable { private static final long serialVersionUID = 1L; private final String text; public TransformationResult(String text) { this.text = text; } public String getText() { return text; } @Override public String toString() { return "TransformationResult(" + text + ")"; } } public static class JobFailed implements Serializable { private static final long serialVersionUID = 1L; private final String reason; private final TransformationJob job; public JobFailed(String reason, TransformationJob job) { this.reason = reason; this.job = job; } public String getReason() { return reason; } public TransformationJob getJob() { return job; } @Override public String toString() { return "JobFailed(" + reason + ")"; } } public static final String BACKEND_REGISTRATION = "BackendRegistration"; } //#messages
akka { loglevel = "DEBUG" actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = DEBUG # heartbeat-interval = 5s # default 4s # acceptable-heartbeat-pause = 10s # default 10s netty.tcp { hostname = "127.0.0.1" port = 1170 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.2:1170", "akka.tcp://ClusterSystem@127.0.0.3:1170"] # roles = ["role"] auto-down-unreachable-after = 280s } } # Disable legacy metrics in akka-cluster. akka.cluster.metrics.enabled=off # Enable metrics extension in akka-cluster-metrics. akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] # Sigar native library extract location during tests. # Note: use per-jvm-instance folder when running multiple jvm on one host. akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
include "backend" akka.cluster.min-nr-of-members = 2 akka.cluster.role { watson.min-nr-of-members = 2 } akka.actor.deployment { }
include "application" akka { loglevel = "DEBUG" }
logback.xml
Description: XML document