>From the stack trace it looks like you are using LeveldbJournal. In a
clustered environment you must use a distributed journal (or
SharedLeveldbJournal for testing).

/Patrik


On Fri, Jun 13, 2014 at 7:35 PM, Luis Medina <[email protected]> wrote:

> Hi Patrik,
>
> Yeah you're probably right. I can't really see anything wrong with the
> wrapping. Here are the complete exception details:
>
> 08:41:51.223 [PipelineNode-akka.actor.default-dispatcher-18] ERROR
> akka.actor.OneForOneStrategy - IO error: lock
> /home/lmedina/projects/pipeline/journal/LOCK: Resource temporarily
> unavailable
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
> ~[akka-actor_2.11-2.3.3.jar:na]
> at akka.actor.ActorCell.create(ActorCell.scala:596)
> ~[akka-actor_2.11-2.3.3.jar:na]
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
> ~[akka-actor_2.11-2.3.3.jar:na]
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> ~[akka-actor_2.11-2.3.3.jar:na]
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> ~[akka-actor_2.11-2.3.3.jar:na]
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> ~[akka-actor_2.11-2.3.3.jar:na]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> ~[na:1.8.0_05]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ~[na:1.8.0_05]
> at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_05]
> Caused by: org.fusesource.leveldbjni.internal.NativeDB$DBException: IO
> error: lock /home/lmedina/projects/pipeline/journal/LOCK: Resource
> temporarily unavailable
> at
> org.fusesource.leveldbjni.internal.NativeDB.checkStatus(NativeDB.java:200)
> ~[leveldbjni-1.7.jar:1.7]
> at org.fusesource.leveldbjni.internal.NativeDB.open(NativeDB.java:218)
> ~[leveldbjni-1.7.jar:1.7]
> at org.fusesource.leveldbjni.JniDBFactory.open(JniDBFactory.java:168)
> ~[leveldbjni-1.7.jar:1.7]
> at
> akka.persistence.journal.leveldb.LeveldbStore$class.preStart(LeveldbStore.scala:114)
> ~[akka-persistence-experimental_2.11-2.3.3.jar:na]
> at
> akka.persistence.journal.leveldb.LeveldbJournal.preStart(LeveldbJournal.scala:20)
> ~[akka-persistence-experimental_2.11-2.3.3.jar:na]
> at akka.actor.Actor$class.aroundPreStart(Actor.scala:470)
> ~[akka-actor_2.11-2.3.3.jar:na]
> at
> akka.persistence.journal.leveldb.LeveldbJournal.aroundPreStart(LeveldbJournal.scala:20)
> ~[akka-persistence-experimental_2.11-2.3.3.jar:na]
> at akka.actor.ActorCell.create(ActorCell.scala:580)
> ~[akka-actor_2.11-2.3.3.jar:na]
> ... 7 common frames omitted
> 08:41:51.226 [PipelineNode-akka.actor.default-dispatcher-18] INFO
>  akka.actor.RepointableActorRef - Message
> [akka.persistence.JournalProtocol$ReplayMessages] from
> Actor[akka://PipelineNode/user/sharding/Streams/1#-1640969238] to
> Actor[akka://PipelineNode/system/journal#101608488] was not delivered. [6]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
> To be even more descriptive, here are the exact steps that leads to this:
>
> 1. I start up my Node class which is what creates an ActorSystem:
>
> ActorSystem system = ActorSystem.create("PipelineNode", configuration);
>
> a singleton WatchDog (and a proxy to communicate with it):
>
> system.actorOf(ClusterSingletonManager.defaultProps(WatchDogActor.props(),
> "WatchDogSingleton", PoisonPill.getInstance(), ""), "WatchDog");
>
> ActorRef watchdogProxy =
> system.actorOf(ClusterSingletonProxy.defaultProps("/user/WatchDog/WatchDogSingleton",
> ""), "WatchDogProxy");
>
> and finally my sharded StreamSupervisor:
>
> ClusterSharding.get(system).start("Streams",
>                                   StreamSupervisor.props(),
>                                   new MessageExtractor());
>
> In this case, my WatchDog is what is going to be managing the streams
> being started up by the StreamSupervisor.
>
> 2. Next, I create a Stream class which is what essentially defines a
> stream and I pass it to my WatchDog through its proxy as a persistent
> object so that it can persist this information (in this case wrapping the
> stream in a Persistent object and sending it along gives me no issues):
>
> Stream stream = new Stream("Streams", 1L, StreamActor.props(config, new
> TwitterStreamManagerBuilder()));
> watchdogProxy.tell(Persistent.create(stream), ActorRef.noSender());
>
> Among the information included in the Stream object are its name
> ("Streams") which will be used to identify the correct ShardRegion and
> its id value (1L) which is what the MessageExtractor will use for its entryId
> and to generate its shardId (id % x).
>
> 3. Now my WatchDog singleton will receive the Stream information and it
> will create a new instance of the stream by sending it a new ShardMessage:
>
> ActorRef region =
> ClusterSharding.get(getContext().system()).shardRegion(stream.getName());
> region.tell(new ShardMessage(Persistent.create(stream.getProps()),
> stream.getId()), getSelf());
>
> the WatchDog will keep sending the props object to the StreamSupervisor until
> it receives an acknowledgement from it.
>
> 4. Inside the StreamSupervisor, it should be receiving the
> Persistent-wrapped Props object and instantiate an actor with it except
> that it never gets this far:
>
> public class StreamSupervisor extends UntypedProcessor {
>     private Props props;
>
>     private ActorRef stream;
>
>     public static Props props() {
>         return Props.create(StreamSupervisor.class);
>     }
>
>     @Override
>     public void onReceive(Object message) {
>         if (message instanceof Persistent) {
>             onMessage((Persistent) message);
>         }
>     }
>
>     public void onMessage(Persistent message) {
>         Object payload = message.payload();
>
>         if (payload instanceof Props) {
>             props = (Props) payload;
>
>             initializeStream();
>
>             getContext().become(initialized);
>         }
>     }
>
>     private void initializeStream() {
>         stream = getContext().actorOf(props);
>         getContext().watch(stream);
>     }
>
>     private final Behavior initialized = new Behavior() {
>         @Override
>         public void apply(Object message) {
>             if (message instanceof Terminated) {
>                 Terminated terminated = (Terminated) message;
>
>                 if (terminated.getActor() == stream) {
>                     getContext().unwatch(stream);
>                     initializeStream();
>                 }
>             }
>         }
>     };
> }
>
> That is pretty much it. Also, I don't know if it's of any significance
> here but when I start up my cluster, I set the min-nr-of-members = 3.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
Twitter: @patriknw

<http://www.scaladays.org/>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to