First, don't use java serialization for performance and security reasons.
Secondly, actor messages should be small (a few 100kB at most). Otherwise
they will prevent other messages to get through, such as cluster heartbeat
messages. Split the large message into smaller messages, or transfer it on
a side channel such as Akka Http or Stream TCP. I'd also recommend that you
try the new remoting implementatio, see Artery in docs.

/Patrik
fre 5 maj 2017 kl. 16:44 skrev Kunal Ghosh <kunal.raj...@gmail.com>:

> Hi,
> my application uses a Akka cluster which has one master node and two child
> seed nodes. The master node reads data from input file and sends it over to
> both child nodes for evaluation (processing).
> The application works fine for smaller data file eg. file with 43 rows but
> when the input file is hug like with 2 million rows the application fails.
> The exception thrown with stack trace is given below.
> I have also attached the configuration file and code examples are attached
> with this mail please do check them out and tell where I am wrong ????
> Thanks in advance.
>
>
>
>
> WARN
> [18:48:19.013]{iCEDQApp-akka.actor.default-dispatcher-22}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
> the default Java serializer for class [org.iceengine.compare.akka.RowData]
> which is not recommended because of performance implications. Use another
> serializer or disable this warning using the setting
> 'akka.actor.warn-about-java-serializer-usage'
> WARN
> [18:48:21.768]{iCEDQApp-akka.actor.default-dispatcher-28}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
> the default Java serializer for class [org.iceengine.compare.akka.Result]
> which is not recommended because of performance implications. Use another
> serializer or disable this warning using the setting
> 'akka.actor.warn-about-java-serializer-usage'
> WARN
> [18:48:21.813]{iCEDQApp-akka.actor.default-dispatcher-4}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
> the default Java serializer for class [org.iceengine.compare.akka.Result]
> which is not recommended because of performance implications. Use another
> serializer or disable this warning using the setting
> 'akka.actor.warn-about-java-serializer-usage'
> WARN
> [18:48:23.002]{iCEDQApp-akka.actor.default-dispatcher-3}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
> Node [akka.tcp://iCEDQApp@192.168.100.199:2551] - Marking node(s) as
> UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:62915,
> status = Up)]. Node roles [backend]
> WARN
> [18:48:23.058]{iCEDQApp-akka.actor.default-dispatcher-17}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
> Node [akka.tcp://iCEDQApp@192.168.100.199:62915] - Marking node(s) as
> UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:2551,
> status = Up)]. Node roles []
>  Kunal_ICE
> ERROR[18:48:23.473]{iCEDQApp-akka.actor.default-dispatcher-24}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp:70)-AssociationError
> [akka.tcp://iCEDQApp@192.168.100.199:2552] <- [akka.tcp://
> iCEDQApp@192.168.100.199:62915]: Error [null] [
> java.io.OptionalDataException
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>  at java.util.HashMap.readObject(HashMap.java:1402)
>  at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>  at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:304)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>  at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:304)
>  at
> akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
>  at
> akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
>  at scala.util.Try$.apply(Try.scala:192)
>  at akka.serialization.Serialization.deserialize(Serialization.scala:131)
>  at
> akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:80)
>  at
> akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
>  at
> akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
>  at scala.util.Try$.apply(Try.scala:192)
>  at akka.serialization.Serialization.deserialize(Serialization.scala:131)
>  at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
>  at
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
>  at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
>  at akka.remote.DefaultMessageDispatcher.msgLog$1(Endpoint.scala:69)
>  at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:81)
>  at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:988)
>  at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
>  at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> *Front End Class*
>
> =======================
> ActorSystem system = ActorSystem.create("iCEDQApp",ConfigFactory.load());
>
>    System.out.println("IceCompareEngine ============ >>>>>>
> "+context_._ruleType);
>    ClusterRegisterOnMemberUp registerUp = new
> ClusterRegisterOnMemberUp(actors,context_.getRiid(),context_,system,context_._ruleType);
>    FutureTask<ActorRef> futureTask = new FutureTask<ActorRef>(registerUp);
>
> //   ExecutorService executor = Executors.newFixedThreadPool(1);
> //   executor.execute(futureTask);
>    Cluster.get(system).registerOnMemberUp(futureTask);
>    while (true){
>     try{
>      if(futureTask.isDone()){
>       System.out.println(">>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>> ");
>       break;
>      }
>     }catch (Exception e) {
>      // TODO: handle exception
>     }
>    }
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to