Hi,

you have to make sure that there is only one process that is accessing
LevelDB storage files. Is that the case?

On Sat, Dec 13, 2014 at 11:33 AM, 何品 <hepin1...@gmail.com> wrote:
>
> I am using akka persistent with netty to handle the statistic of game
> server.
>
> There are 1 to N statistic client (with akka persistent ,ALOD & netty )
> and 1 server which receive the incoming protobuf encoded statistic data and
> flush to database for later use.
>
> I have some issue at the first version of the the server,so some off the
> message was messed up.So I have to fix it.
>
> After I fixed it I want to replay the statistic message and redeliver to
> the server and rebuild the whole story.
>
>
> I write a new program named Migrate and simple connect to the statistic
> server and then starting reply,ignore the
> ```SendConfirmed(seqNr:Long)```message.
>
> I have two type of message .
> ```
>
> case event: SendEvent =>
>   println("处理发送消息 : "+seqNr+" ", event) //handle it
>   handleMigrate(event)
> case confirm: SendConfirm =>
>   println("处理发送确认消息: "+seqNr+" 忽略 ", confirm) //ignore the confirm message
>
> ```
>
> ```
>
> def handleMigrate(event: SendEvent) = {
>   seqNr += 1
>   val statisticRequest = 
> toStatisticRequest(ByteString.copyFrom(event.payload), event.msgType, seqNr)
>   socketActor ! SendMessage(statisticRequest) //send to socket actor,will 
> eventually delivered to the statistic server
> }
>
> @inline
> def toStatisticRequest(payload: ByteString, payloadType: PayloadType, seqNr: 
> Long): StatisticRequest = {
>   StatisticRequest.newBuilder()
>     .setSeqNr(seqNr)
>     .setPayload(payload)
>     .setType(payloadType)
>     .build()
> }
>
> ```
>
> the implement is so simple,via akka persistent view
>
> class MigrateReader(socketActor: ActorRef) extends PersistentView with 
> ActorLogging {
>   override def viewId: String = "statisticClientView"
>
>   override def persistenceId: String = "statisticClient"
>
>   private var seqNr = 0L
>
>   override def receive: Actor.Receive = {
>     case event: SendEvent =>
>       println("处理发送消息 : "+seqNr+" ", event)
>       handleMigrate(event)
>     case confirm: SendConfirm =>
>       println("处理发送确认消息: "+seqNr+" 忽略 ", confirm)
>     case SocketHandlerRegistered =>
>       log.info("socket已经绑定,开始读取数据")
>     case msg =>
>       println("忽略消息:"+ msg)
>   }
>
> ......
>
>
>
> and the persistenceId is the same as the statistic client's origin one.
>
> the interesting thing is that.after I restart the statistic client,it
> starting to redelivering the unconfirmed message ,but I could not read
> the journal,even them have the same name.
>
> I have tested this program in local ,generated by same code.it plays
> well,but fail after I try to reply other's generated one.
>
> the Data size is around 17GB, 670MB after zipped
>
> but after I copy the journal (leveldb) to the migrate program's journal
> target folder .it don't run but.
>
>    --> : akka.tcp://
> xxjh@127.0.0.1:2552/user/abstractEngine/moduleManager/migrate/b63a2f24-3ee6-4e99-adba-c49291c88e36/$a
> [Error][2014-12-13 05:45:55]:
> org.fusesource.leveldbjni.internal.NativeDB$DBException: IO error:
> /home/kerr/devProject/ideaProject/server/xxjh/statistic/target/universal/stage/bin/journal/020912.sst:
> Invalid argument
>
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
> at akka.actor.ActorCell.create(ActorCell.scala:596)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException:
> org.fusesource.leveldbjni.internal.NativeDB$DBException: IO error:
> /home/kerr/devProject/ideaProject/server/xxjh/statistic/target/universal/stage/bin/journal/020912.sst:
> Invalid argument
> at
> org.fusesource.leveldbjni.internal.JniDBIterator.seek(JniDBIterator.java:68)
> at
> akka.persistence.journal.leveldb.LeveldbIdMapping$$anonfun$readIdMap$1.apply(LeveldbIdMapping.scala:32)
> at
> akka.persistence.journal.leveldb.LeveldbIdMapping$$anonfun$readIdMap$1.apply(LeveldbIdMapping.scala:31)
> at
> akka.persistence.journal.leveldb.LeveldbStore$class.withIterator(LeveldbStore.scala:78)
> at
> akka.persistence.journal.leveldb.LeveldbJournal.withIterator(LeveldbJournal.scala:20)
> at
> akka.persistence.journal.leveldb.LeveldbIdMapping$class.readIdMap(LeveldbIdMapping.scala:31)
> at
> akka.persistence.journal.leveldb.LeveldbIdMapping$class.preStart(LeveldbIdMapping.scala:54)
> at
> akka.persistence.journal.leveldb.LeveldbJournal.akka$persistence$journal$leveldb$LeveldbStore$$super$preStart(LeveldbJournal.scala:20)
> at
> akka.persistence.journal.leveldb.LeveldbStore$class.preStart(LeveldbStore.scala:113)
> at
> akka.persistence.journal.leveldb.LeveldbJournal.preStart(LeveldbJournal.scala:20)
> at akka.actor.Actor$class.aroundPreStart(Actor.scala:470)
> at
> akka.persistence.journal.leveldb.LeveldbJournal.aroundPreStart(LeveldbJournal.scala:20)
> at akka.actor.ActorCell.create(ActorCell.scala:580)
> ... 7 more
> Caused by: org.fusesource.leveldbjni.internal.NativeDB$DBException: IO
> error:
> /home/kerr/devProject/ideaProject/server/xxjh/statistic/target/universal/stage/bin/journal/020912.sst:
> Invalid argument
> at
> org.fusesource.leveldbjni.internal.NativeDB.checkStatus(NativeDB.java:200)
> at
> org.fusesource.leveldbjni.internal.NativeIterator.checkStatus(NativeIterator.java:121)
> at
> org.fusesource.leveldbjni.internal.NativeIterator.seek(NativeIterator.java:151)
> at
> org.fusesource.leveldbjni.internal.NativeIterator.seek(NativeIterator.java:145)
> at
> org.fusesource.leveldbjni.internal.NativeIterator.seek(NativeIterator.java:138)
> at
> org.fusesource.leveldbjni.internal.JniDBIterator.seek(JniDBIterator.java:63)
> ... 19 more
>
>    --> : akka://xxjh/system/journal
>
>
> Any Idea?Thanks
>
>
>
>
>
>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>


-- 
Martynas Mickevičius
Typesafe <http://typesafe.com/> – Reactive
<http://www.reactivemanifesto.org/> Apps on the JVM

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to