i am doing cluster example from site : 
https://github.com/akka/akka-samples/tree/2.5/akka-sample-cluster-java
while i am executing the cluster it shows following error can anyone solve 
this error. 

Error is like this 
//=========================Main 1=================================
//=========================Main 2=================================
[DEBUG] [07/24/2017 15:49:45.072] [main] 
[EventStream(akka://ClusterSystem)] logger log1-Logging$DefaultLogger 
started
[DEBUG] [07/24/2017 15:49:45.088] [main] 
[EventStream(akka://ClusterSystem)] Default Loggers started
[INFO] [07/24/2017 15:49:45.727] [main] [akka.remote.Remoting] Starting 
remoting
[INFO] [07/24/2017 15:49:46.163] [main] [akka.remote.Remoting] Remoting 
started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:1170]
[INFO] [07/24/2017 15:49:46.195] [main] [akka.remote.Remoting] Remoting now 
listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:1170]
[INFO] [07/24/2017 15:49:46.210] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:1170] - Starting up...
[INFO] [07/24/2017 15:49:46.475] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:1170] - Registered cluster JMX MBean 
[akka:type=Cluster]
[INFO] [07/24/2017 15:49:46.475] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:1170] - Started up successfully
//=========================Main 3=================================
[DEBUG] [07/24/2017 15:49:46.600] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster-metrics] Collection 
started.
//=========================Main 4=================================
//========================= CLUSTER INVOKE PRE START METHOD 
=================================
//========================= CLUSTER IS STARTED 
=================================
[DEBUG] [07/24/2017 15:49:46.615] 
[ClusterSystem-akka.actor.default-dispatcher-16] 
[akka.cluster.metrics.MetricsCollector$(akka://ClusterSystem)] Trying 
akka.cluster.metrics.SigarMetricsCollector.
[DEBUG] [07/24/2017 15:49:46.615] 
[ClusterSystem-akka.actor.default-dispatcher-16] 
[akka.cluster.metrics.MetricsCollector$(akka://ClusterSystem)] Trying 
akka.cluster.metrics.JmxMetricsCollector.
[INFO] [07/24/2017 15:49:46.631] 
[ClusterSystem-akka.actor.default-dispatcher-16] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:1170] - Metrics collection has started 
successfully
[WARN] [07/24/2017 15:49:46.662] 
[ClusterSystem-akka.actor.default-dispatcher-3] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster/core/daemon/downingProvider]
 
Don't use auto-down feature of Akka Cluster in production. See 
'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
//========================= GOING INTO CURRENT CLUSTER STATE 
=================================
//========================= CURRENT CLUSTER STATE SUCCESS 
=================================CurrentClusterState(TreeSet(),Set(),Set(),None,Map())
[WARN] [07/24/2017 15:49:56.746] 
[ClusterSystem-akka.actor.default-dispatcher-17] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster/core/daemon/joinSeedNodeProcess-1]
 
Couldn't join seed nodes after [2] attempts, will try again. 
seed-nodes=[akka.tcp://ClusterSystem@127.0.0.2:1170, 
akka.tcp://ClusterSystem@127.0.0.3:1170]
[WARN] [07/24/2017 15:50:01.765] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/cluster/core/daemon/joinSeedNodeProcess-1]
 
Couldn't join seed nodes after [3] attempts, will try again. 
seed-nodes=[akka.tcp://ClusterSystem@127.0.0.2:1170, 
akka.tcp://ClusterSystem@127.0.0.3:1170]
[INFO] [07/24/2017 15:50:01.812] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1]
 
No response from remote for outbound association. Associate timed out after 
[15000 ms].
[INFO] [07/24/2017 15:50:01.827] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2]
 
No response from remote for outbound association. Associate timed out after 
[15000 ms].
[WARN] [07/24/2017 15:50:01.905] [New I/O boss #3] 
[NettyTransport(akka://ClusterSystem)] Remote connection to null failed 
with org.jboss.netty.channel.ConnectTimeoutException: connection timed out: 
/127.0.0.3:1170
[WARN] [07/24/2017 15:50:01.905] [New I/O boss #3] 
[NettyTransport(akka://ClusterSystem)] Remote connection to null failed 
with org.jboss.netty.channel.ConnectTimeoutException: connection timed out: 
/127.0.0.2:1170
[DEBUG] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1/endpointWriter]
 
AssociationError [akka.tcp://ClusterSystem@127.0.0.1:1170] -> 
[akka.tcp://ClusterSystem@127.0.0.3:1170]: Error [Association failed with 
[akka.tcp://ClusterSystem@127.0.0.3:1170]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://ClusterSystem@127.0.0.3:1170]
Caused by: java.util.concurrent.TimeoutException: No response from remote 
for outbound association. Associate timed out after [15000 ms].
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:367)
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:341)
    at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka.actor.FSM$class.processEvent(FSM.scala:663)
    at 
akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:629)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
    at 
akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519)
    at akka.actor.ActorCell.invoke(ActorCell.scala:488)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
]
[DEBUG] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter]
 
AssociationError [akka.tcp://ClusterSystem@127.0.0.1:1170] -> 
[akka.tcp://ClusterSystem@127.0.0.2:1170]: Error [Association failed with 
[akka.tcp://ClusterSystem@127.0.0.2:1170]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://ClusterSystem@127.0.0.2:1170]
Caused by: java.util.concurrent.TimeoutException: No response from remote 
for outbound association. Associate timed out after [15000 ms].
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:367)
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:341)
    at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka.actor.FSM$class.processEvent(FSM.scala:663)
    at 
akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:629)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
    at 
akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519)
    at akka.actor.ActorCell.invoke(ActorCell.scala:488)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
]
[WARN] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1]
 
Association with remote system [akka.tcp://ClusterSystem@127.0.0.3:1170] 
has failed, address is now gated for [5000] ms. Reason: [Association failed 
with [akka.tcp://ClusterSystem@127.0.0.3:1170]] Caused by: [No response 
from remote for outbound association. Associate timed out after [15000 ms].]
[INFO] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.actor.default-dispatcher-2] 
[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1]
 
Message [akka.actor.Status$Failure] from 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1#-1283183146]
 
to 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1#-1283183146]
 
was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[WARN] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0]
 
Association with remote system [akka.tcp://ClusterSystem@127.0.0.2:1170] 
has failed, address is now gated for [5000] ms. Reason: [Association failed 
with [akka.tcp://ClusterSystem@127.0.0.2:1170]] Caused by: [No response 
from remote for outbound association. Associate timed out after [15000 ms].]
[INFO] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2]
 
Message [akka.actor.Status$Failure] from 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2#-987867521]
 
to 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2#-987867521]
 
was not delivered. [2] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [3] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[DEBUG] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter]
 
Disassociated [akka.tcp://ClusterSystem@127.0.0.1:1170] -> 
[akka.tcp://ClusterSystem@127.0.0.2:1170]
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-2] 
[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter]
 
Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from 
Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter#1576329271]
 
to 
Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter#1576329271]
 
was not delivered. [7] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[DEBUG] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://ClusterSystem@127.0.0.1:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1/endpointWriter]
 
Disassociated [akka.tcp://ClusterSystem@127.0.0.1:1170] -> 
[akka.tcp://ClusterSystem@127.0.0.3:1170]
[INFO] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [8] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [9] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [10] dead 
letters encountered, no more dead letters will be logged. This logging can 
be turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.



and tell me how to execute node i am confusing while executing nodes.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
package sample.cluster.transformation;

import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
//import akka.cluster.metrics.ClusterMetricsExtension;
//import sample.cluster.transformation.TransformationMessages.TransformationJob;
//import sample.cluster.transformation.TransformationMessages.TransformationResult;
//import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION;

//#backend
@SuppressWarnings("deprecation")
public class TransformationBackend extends UntypedActor implements TransformationMessages{

  Cluster cluster = Cluster.get(getContext().system());

  //subscribe to cluster changes, MemberUp
  @Override
  public void preStart() {
	  System.out.println("//========================= CLUSTER INVOKE PRE START METHOD =================================");
    cluster.subscribe(getSelf(), MemberUp.class);
    System.out.println("//========================= CLUSTER IS STARTED =================================");
  }

  //re-subscribe when restart
  @Override
  public void postStop() {
    cluster.unsubscribe(getSelf());
    System.out.println("//========================= CLUSTER STOPED.....! =================================");
  }

  @Override
  public void onReceive(Object message) {
    if (message instanceof TransformationJob) {
    	 System.out.println("//========================= GOING INTO TRANSFORMATION JOB =================================");
      TransformationJob job = (TransformationJob) message;
      getSender().tell(new TransformationResult(job.getText().toUpperCase()),
          getSelf());
      System.out.println("//========================= TRANSFORMATION JOB SUCCESS.... =================================");
    } else if (message instanceof CurrentClusterState) {
    	 System.out.println("//========================= GOING INTO CURRENT CLUSTER STATE =================================");
      CurrentClusterState state = (CurrentClusterState) message;
      System.out.println("//========================= CURRENT CLUSTER STATE SUCCESS ================================="+state);
      for (Member member : state.getMembers()) {
        if (member.status().equals(MemberStatus.up())) {
        	 System.out.println("//========================= MEMBER IS UP =================================");
          register(member);
          System.out.println("//========================= MEMBERS ARE UP ================================="+member);
        }
      }

    } else if (message instanceof MemberUp) {
    	 System.out.println("//========================= MEMBERS..... =================================");
      MemberUp mUp = (MemberUp) message;
      register(mUp.member());
      System.out.println("//========================= MEMBERS UP REGISTER =================================");
    } else {
      unhandled(message);
      System.out.println("//========================= ERROR IN MEMBERS =================================");
    }
     }

  void register(Member member) {
	  System.out.println("//========================= REGISTER MEMBER =================================");
    if (member.hasRole("frontend"))
      getContext().actorSelection(member.address() + "/user/frontend").tell(
          BACKEND_REGISTRATION, getSelf());
  }
  }//#backend
package sample.cluster.transformation;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class TransformationBackendMain {
    public static void main(String[] args) {
        // Override the configuration of the port when specified as program argument
    	
    	System.out.println("//=========================Main 1=================================");
        final String hostname = args.length > 0 ? args[0] : "127.0.0.1";
        final String port = args.length > 1 ? args[1] : "1170";
        final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname).
                withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)).
                withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
                withFallback(ConfigFactory.load());
        System.out.println("//=========================Main 2=================================");
        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        System.out.println("//=========================Main 3=================================");
        system.actorOf(Props.create(TransformationBackend.class), "backend");
        System.out.println("//=========================Main 4=================================");
    }}
package sample.cluster.transformation;

import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import sample.cluster.transformation.TransformationMessages.JobFailed;
import sample.cluster.transformation.TransformationMessages.TransformationJob;

import java.util.ArrayList;
import java.util.List;

import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION;

//#frontend
public class TransformationFrontend extends UntypedActor {

  List<ActorRef> backends = new ArrayList<ActorRef>();
  int jobCounter = 0;

  @Override
  public void onReceive(Object message) {
    if ((message instanceof TransformationJob) && backends.isEmpty()) {
      TransformationJob job = (TransformationJob) message;
      getSender().tell(
          new JobFailed("Service unavailable, try again later", job),
          getSender());

    } else if (message instanceof TransformationJob) {
      TransformationJob job = (TransformationJob) message;
      jobCounter++;
      backends.get(jobCounter % backends.size())
          .forward(job, getContext());

    } else if (message.equals(BACKEND_REGISTRATION)) {
      getContext().watch(getSender());
      backends.add(getSender());

    } else if (message instanceof Terminated) {
      Terminated terminated = (Terminated) message;
      backends.remove(terminated.getActor());

    } else {
      unhandled(message);
    }
  }

}
//#frontend
package sample.cluster.transformation;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import sample.cluster.transformation.TransformationMessages.TransformationJob;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TransformationFrontendMain {

    public static void main(String[] args) {
        // Override the configuration of the port when specified as program argument
        final String hostname = args.length > 0 ? args[0] : "127.0.0.1";
        final String port = args.length > 1 ? args[1] : "1170";

        final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname).
                withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)).
                withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
                withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("ClusterSystem", config);

        
		final ActorRef frontend = system.actorOf(
                Props.create(TransformationFrontend.class), "frontend");
        final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
        final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
        final ExecutionContext ec = system.dispatcher();
        final AtomicInteger counter = new AtomicInteger();
        system.scheduler().schedule(interval, interval, new Runnable() {
            public void run() {
                Patterns.ask(frontend,
                        new TransformationJob("hello-" + counter.incrementAndGet()),
                        timeout).onSuccess(new OnSuccess<Object>() {
                    public void onSuccess(Object result) {
                        System.out.println(result);
                    }
                }, ec);
            }

        }, ec);

    }
}
package sample.cluster.transformation;

import java.io.Serializable;

//#messages
public interface TransformationMessages {

  public static class TransformationJob implements Serializable {
   
	private static final long serialVersionUID = 1L;
	private final String text;

    public TransformationJob(String text) {
      this.text = text;
    }

    public String getText() {
      return text;
    }
    
    @Override
    public String toString() {
      return "TransformationJob(" + text + ")";
    }
  }

  public static class TransformationResult implements Serializable {
   
	private static final long serialVersionUID = 1L;
	private final String text;

    public TransformationResult(String text) {
      this.text = text;
    }

    public String getText() {
      return text;
    }

    @Override
    public String toString() {
      return "TransformationResult(" + text + ")";
    }
  }

  public static class JobFailed implements Serializable {
  
	private static final long serialVersionUID = 1L;
	private final String reason;
    private final TransformationJob job;

    public JobFailed(String reason, TransformationJob job) {
      this.reason = reason;
      this.job = job;
    }

    public String getReason() {
      return reason;
    }

    public TransformationJob getJob() {
      return job;
    }

    @Override
    public String toString() {
      return "JobFailed(" + reason + ")";
    }
  }

  public static final String BACKEND_REGISTRATION = "BackendRegistration";

}
//#messages
akka {
  loglevel = "DEBUG"
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = DEBUG
    
   # heartbeat-interval = 5s   # default 4s
   # acceptable-heartbeat-pause = 10s  # default 10s
    
    
    netty.tcp {
      hostname = "127.0.0.1"
      port = 1170
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.2:1170",
      "akka.tcp://ClusterSystem@127.0.0.3:1170"]
                 # roles = ["role"]
        auto-down-unreachable-after = 280s
  }
}

# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
include "backend"

akka.cluster.min-nr-of-members = 2

akka.cluster.role {
  watson.min-nr-of-members = 2
}

akka.actor.deployment {
}
include "application"
 
akka {
  loglevel = "DEBUG"
}

Attachment: logback.xml
Description: XML document

Reply via email to