[akka-user] unrecoverable Tcp connection with ByteString of moderate size (~250k)
Dear all, I had a few questions for this list last week regarding an unrecoverable error condition that I was seeing in Wandoulabs WebSockets. I have now isolated the problem to standard Scala 2.11.7 / Akka 2.3.11 Tcp.Write with the minimal example at the bottom of this email. It seems that sending a ByteString of a moderate size can basically nuke the network connection. I am very concerned that such unrecoverable errors are possible (reconnecting would potentially allow sending the failed message, but let's not consider that a solution). What is even more concerning is that I have seen related problems in my integration tests, where I am using Acking with backpressure everywhere, but I have been unable to get a reliable reproduction of the problem. Using Acking seems to mitigate the problem somewhat, but obviously not enough. Can somebody please have a look at this and let me know if it is a bug or if there is some part of the Tcp.Write spec that I failed to grok. Also, confirming if the problem exists on some network other than mine would be a good data point. My corporate environment uses PEAP Best regards, Sam package testing import akka.actor._ import akka.event.LoggingReceive import akka.io.{ IO, Tcp } import akka.util.ByteString import java.net.InetSocketAddress import java.util.UUID import concurrent.duration._ /** * This is a test of Akka IO to see if the WebSocket behaviour * described in Buggy is a TCP problem or limited to the WebSocket * implementation. * * Run a blackhole on the target machine, e.g. * * nc -k -l /dev/null * * For a single session, to confirm transmission of payloads: * * nc -l blackhole * * run-main testing.BuggyTcp * */ object BuggyTcp extends App { implicit val system = ActorSystem() val remote = new InetSocketAddress(remote-hostname-here, ) system.actorOf(Props(classOf[BuggyTcp], remote), client) } class BuggyTcp(remote: InetSocketAddress) extends Actor with ActorLogging { import Tcp._ import context.system override def preStart(): Unit = { IO(Tcp) ! Connect(remote) } var connection: ActorRef = _ object Ack extends Tcp.Event with spray.io.Droppable { override def toString = Ack } def receive = { case CommandFailed(write@Tcp.Write(bytes, ack)) = log.error(sfailed to write ${ack}) // perpetually retry ... does it ever correct itself? import context.dispatcher context.system.scheduler.scheduleOnce(1 second, connection, write) case c: Connected = connection = sender() connection ! Register(self) log.info(sending) // works //connection ! Tcp.Write(ByteString(A * 3), NoAck(Big thing)) //connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) // never recovers connection ! Tcp.Write(ByteString(A * 30), NoAck(Big thing)) connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) case Ack = system.shutdown() case _: ConnectionClosed = system.shutdown() case msg = // WORKAROUND https://github.com/akka/akka/issues/17898 // (can't use LoggingReceive) log.info(sgot a ${msg.getClass.getName}) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: unrecoverable Tcp connection with ByteString of moderate size (~250k)
Aha! Think I got it. I need to send ResumeWriting and listen for a WritingResumed, to recover from such a situation. On Monday, 6 July 2015 11:01:23 UTC+1, Sam Halliday wrote: I should say, that in every example I have tried... with payloads up to 30MB for the initial un-acked message, it always arrives at the destination. The subsequent, and retried, but the connection is completely dead and the Ack-ed message can never be sent. On Monday, 6 July 2015 10:59:38 UTC+1, Sam Halliday wrote: Dear all, I had a few questions for this list last week regarding an unrecoverable error condition that I was seeing in Wandoulabs WebSockets. I have now isolated the problem to standard Scala 2.11.7 / Akka 2.3.11 Tcp.Write with the minimal example at the bottom of this email. It seems that sending a ByteString of a moderate size can basically nuke the network connection. I am very concerned that such unrecoverable errors are possible (reconnecting would potentially allow sending the failed message, but let's not consider that a solution). What is even more concerning is that I have seen related problems in my integration tests, where I am using Acking with backpressure everywhere, but I have been unable to get a reliable reproduction of the problem. Using Acking seems to mitigate the problem somewhat, but obviously not enough. Can somebody please have a look at this and let me know if it is a bug or if there is some part of the Tcp.Write spec that I failed to grok. Also, confirming if the problem exists on some network other than mine would be a good data point. My corporate environment uses PEAP Best regards, Sam package testing import akka.actor._ import akka.event.LoggingReceive import akka.io.{ IO, Tcp } import akka.util.ByteString import java.net.InetSocketAddress import java.util.UUID import concurrent.duration._ /** * This is a test of Akka IO to see if the WebSocket behaviour * described in Buggy is a TCP problem or limited to the WebSocket * implementation. * * Run a blackhole on the target machine, e.g. * * nc -k -l /dev/null * * For a single session, to confirm transmission of payloads: * * nc -l blackhole * * run-main testing.BuggyTcp * */ object BuggyTcp extends App { implicit val system = ActorSystem() val remote = new InetSocketAddress(remote-hostname-here, ) system.actorOf(Props(classOf[BuggyTcp], remote), client) } class BuggyTcp(remote: InetSocketAddress) extends Actor with ActorLogging { import Tcp._ import context.system override def preStart(): Unit = { IO(Tcp) ! Connect(remote) } var connection: ActorRef = _ object Ack extends Tcp.Event with spray.io.Droppable { override def toString = Ack } def receive = { case CommandFailed(write@Tcp.Write(bytes, ack)) = log.error(sfailed to write ${ack}) // perpetually retry ... does it ever correct itself? import context.dispatcher context.system.scheduler.scheduleOnce(1 second, connection, write) case c: Connected = connection = sender() connection ! Register(self) log.info(sending) // works //connection ! Tcp.Write(ByteString(A * 3), NoAck(Big thing)) //connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) // never recovers connection ! Tcp.Write(ByteString(A * 30), NoAck(Big thing)) connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) case Ack = system.shutdown() case _: ConnectionClosed = system.shutdown() case msg = // WORKAROUND https://github.com/akka/akka/issues/17898 // (can't use LoggingReceive) log.info(sgot a ${msg.getClass.getName}) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka HTTP (RC-4): failing on blocking operations with simple test
Wow, it might be the cause, thank you, Egis! I ll check it soon! On Monday, July 6, 2015 at 11:45:17 AM UTC+2, Egis wrote: You are most likely getting these errors because of this regression in RC4: https://github.com/akka/akka/issues/17854 Your test should do what you expect with RC3. It would also work with RC4 if your client had HTTP KeepAlive enabled: ab -n 50 -c 4 -k http://localhost:9000/test Egis On Sunday, 5 July 2015 14:29:07 UTC+1, Dmitry Vorobiov wrote: Hi, I am investigating akka-http as a potential replacement for spray for our new microservices. There we have lots of blocking IO operations that are pretty easy to handle with spray by wrapping them in Future and blocking and providing a separate thread pool. I tried to test the same approach with simple Akka HTTP code: object Main extends App with SimpleAkkaHttpMain with Logging trait SimpleAkkaHttpMain { implicit val system: ActorSystem = ActorSystem() implicit def executor: ExecutionContextExecutor = system.dispatcher val endpointDispatcher = system.dispatchers.lookup(akka.dispatchers.threadpool-dispatcher) implicit val materializer: Materializer = ActorMaterializer() Http().bindAndHandle(new SimpleEndpoint(endpointDispatcher).routes(), 0.0.0.0, 9000) } class SimpleEndpoint(_context: ExecutionContext) extends Directives { implicit val context = _context def routes() = { (get path(test)) { complete { Future { blocking { Thread.sleep(2000) } test } } } } } Config: threadpool-dispatcher { type = Dispatcher executor = thread-pool-executor thread-pool-executor { core-pool-size-min = 2 core-pool-size-factor = 2.0 core-pool-size-max = 100 } } I ran the following terminal command to test it: ab -n 50 -c 4 http://localhost:9000/test Benchmarking localhost (be patient)...apr_socket_recv: Connection reset by peer (54) Total of 10 requests completed This is what I see in logs: history-api DEBUG [2015-07-05 15:24:11,403 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:11,404 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:13,426 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:13,426 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$A#-1737469294]] terminated history-api DEBUG [2015-07-05 15:24:13,427 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$z#-1452647390]] terminated history-api DEBUG [2015-07-05 15:24:16,186 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:16,186 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:16,187 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:16,187 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,202 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,208 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,208 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$F#1453413388]] terminated history-api DEBUG [2015-07-05 15:24:18,209 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,209 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:20,217 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:20,219 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:20,223 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$K#1142842464]] terminated A similar test for Spray works smoothly. Can you please help me with advice what one can do to process blocking operations in akka-http? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: unrecoverable Tcp connection with ByteString of moderate size (~250k)
This part of Register is also relevant * @param useResumeWriting If this is set to true then the connection actor *will refuse all further writes after issuing a [[CommandFailed]] *notification until [[ResumeWriting]] is received. This can *be used to implement NACK-based write backpressure. this implies that if useResumeWriting=false then further writes after a CommandFailed *will* be successful when the backlog is cleared. Is this correct? On Monday, 6 July 2015 11:18:13 UTC+1, Sam Halliday wrote: Aha! Think I got it. I need to send ResumeWriting and listen for a WritingResumed, to recover from such a situation. On Monday, 6 July 2015 11:01:23 UTC+1, Sam Halliday wrote: I should say, that in every example I have tried... with payloads up to 30MB for the initial un-acked message, it always arrives at the destination. The subsequent, and retried, but the connection is completely dead and the Ack-ed message can never be sent. On Monday, 6 July 2015 10:59:38 UTC+1, Sam Halliday wrote: Dear all, I had a few questions for this list last week regarding an unrecoverable error condition that I was seeing in Wandoulabs WebSockets. I have now isolated the problem to standard Scala 2.11.7 / Akka 2.3.11 Tcp.Write with the minimal example at the bottom of this email. It seems that sending a ByteString of a moderate size can basically nuke the network connection. I am very concerned that such unrecoverable errors are possible (reconnecting would potentially allow sending the failed message, but let's not consider that a solution). What is even more concerning is that I have seen related problems in my integration tests, where I am using Acking with backpressure everywhere, but I have been unable to get a reliable reproduction of the problem. Using Acking seems to mitigate the problem somewhat, but obviously not enough. Can somebody please have a look at this and let me know if it is a bug or if there is some part of the Tcp.Write spec that I failed to grok. Also, confirming if the problem exists on some network other than mine would be a good data point. My corporate environment uses PEAP Best regards, Sam package testing import akka.actor._ import akka.event.LoggingReceive import akka.io.{ IO, Tcp } import akka.util.ByteString import java.net.InetSocketAddress import java.util.UUID import concurrent.duration._ /** * This is a test of Akka IO to see if the WebSocket behaviour * described in Buggy is a TCP problem or limited to the WebSocket * implementation. * * Run a blackhole on the target machine, e.g. * * nc -k -l /dev/null * * For a single session, to confirm transmission of payloads: * * nc -l blackhole * * run-main testing.BuggyTcp * */ object BuggyTcp extends App { implicit val system = ActorSystem() val remote = new InetSocketAddress(remote-hostname-here, ) system.actorOf(Props(classOf[BuggyTcp], remote), client) } class BuggyTcp(remote: InetSocketAddress) extends Actor with ActorLogging { import Tcp._ import context.system override def preStart(): Unit = { IO(Tcp) ! Connect(remote) } var connection: ActorRef = _ object Ack extends Tcp.Event with spray.io.Droppable { override def toString = Ack } def receive = { case CommandFailed(write@Tcp.Write(bytes, ack)) = log.error(sfailed to write ${ack}) // perpetually retry ... does it ever correct itself? import context.dispatcher context.system.scheduler.scheduleOnce(1 second, connection, write) case c: Connected = connection = sender() connection ! Register(self) log.info(sending) // works //connection ! Tcp.Write(ByteString(A * 3), NoAck(Big thing)) //connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) // never recovers connection ! Tcp.Write(ByteString(A * 30), NoAck(Big thing)) connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) case Ack = system.shutdown() case _: ConnectionClosed = system.shutdown() case msg = // WORKAROUND https://github.com/akka/akka/issues/17898 // (can't use LoggingReceive) log.info(sgot a ${msg.getClass.getName}) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit
[akka-user] Re: Akka HTTP (RC-4): failing on blocking operations with simple test
You are most likely getting these errors because of this regression in RC4: https://github.com/akka/akka/issues/17854 Your test should do what you expect with RC3. It would also work with RC4 if your client had HTTP KeepAlive enabled: ab -n 50 -c 4 -k http://localhost:9000/test Egis On Sunday, 5 July 2015 14:29:07 UTC+1, Dmitry Vorobiov wrote: Hi, I am investigating akka-http as a potential replacement for spray for our new microservices. There we have lots of blocking IO operations that are pretty easy to handle with spray by wrapping them in Future and blocking and providing a separate thread pool. I tried to test the same approach with simple Akka HTTP code: object Main extends App with SimpleAkkaHttpMain with Logging trait SimpleAkkaHttpMain { implicit val system: ActorSystem = ActorSystem() implicit def executor: ExecutionContextExecutor = system.dispatcher val endpointDispatcher = system.dispatchers.lookup(akka.dispatchers.threadpool-dispatcher) implicit val materializer: Materializer = ActorMaterializer() Http().bindAndHandle(new SimpleEndpoint(endpointDispatcher).routes(), 0.0.0.0, 9000) } class SimpleEndpoint(_context: ExecutionContext) extends Directives { implicit val context = _context def routes() = { (get path(test)) { complete { Future { blocking { Thread.sleep(2000) } test } } } } } Config: threadpool-dispatcher { type = Dispatcher executor = thread-pool-executor thread-pool-executor { core-pool-size-min = 2 core-pool-size-factor = 2.0 core-pool-size-max = 100 } } I ran the following terminal command to test it: ab -n 50 -c 4 http://localhost:9000/test Benchmarking localhost (be patient)...apr_socket_recv: Connection reset by peer (54) Total of 10 requests completed This is what I see in logs: history-api DEBUG [2015-07-05 15:24:11,403 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:11,404 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:13,426 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:13,426 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$A#-1737469294]] terminated history-api DEBUG [2015-07-05 15:24:13,427 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$z#-1452647390]] terminated history-api DEBUG [2015-07-05 15:24:16,186 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:16,186 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:16,187 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:16,187 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,202 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,208 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,208 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$F#1453413388]] terminated history-api DEBUG [2015-07-05 15:24:18,209 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:18,209 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:20,217 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:20,219 CEST] TcpListener - New connection accepted history-api DEBUG [2015-07-05 15:24:20,223 CEST] SelectionHandler - Monitored actor [Actor[akka://default/system/IO-TCP-STREAM/server-1-%2F0.0.0.0%3A9000/$K#1142842464]] terminated A similar test for Spray works smoothly. Can you please help me with advice what one can do to process blocking operations in akka-http? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: unrecoverable Tcp connection with ByteString of moderate size (~250k)
I should say, that in every example I have tried... with payloads up to 30MB for the initial un-acked message, it always arrives at the destination. The subsequent, and retried, but the connection is completely dead and the Ack-ed message can never be sent. On Monday, 6 July 2015 10:59:38 UTC+1, Sam Halliday wrote: Dear all, I had a few questions for this list last week regarding an unrecoverable error condition that I was seeing in Wandoulabs WebSockets. I have now isolated the problem to standard Scala 2.11.7 / Akka 2.3.11 Tcp.Write with the minimal example at the bottom of this email. It seems that sending a ByteString of a moderate size can basically nuke the network connection. I am very concerned that such unrecoverable errors are possible (reconnecting would potentially allow sending the failed message, but let's not consider that a solution). What is even more concerning is that I have seen related problems in my integration tests, where I am using Acking with backpressure everywhere, but I have been unable to get a reliable reproduction of the problem. Using Acking seems to mitigate the problem somewhat, but obviously not enough. Can somebody please have a look at this and let me know if it is a bug or if there is some part of the Tcp.Write spec that I failed to grok. Also, confirming if the problem exists on some network other than mine would be a good data point. My corporate environment uses PEAP Best regards, Sam package testing import akka.actor._ import akka.event.LoggingReceive import akka.io.{ IO, Tcp } import akka.util.ByteString import java.net.InetSocketAddress import java.util.UUID import concurrent.duration._ /** * This is a test of Akka IO to see if the WebSocket behaviour * described in Buggy is a TCP problem or limited to the WebSocket * implementation. * * Run a blackhole on the target machine, e.g. * * nc -k -l /dev/null * * For a single session, to confirm transmission of payloads: * * nc -l blackhole * * run-main testing.BuggyTcp * */ object BuggyTcp extends App { implicit val system = ActorSystem() val remote = new InetSocketAddress(remote-hostname-here, ) system.actorOf(Props(classOf[BuggyTcp], remote), client) } class BuggyTcp(remote: InetSocketAddress) extends Actor with ActorLogging { import Tcp._ import context.system override def preStart(): Unit = { IO(Tcp) ! Connect(remote) } var connection: ActorRef = _ object Ack extends Tcp.Event with spray.io.Droppable { override def toString = Ack } def receive = { case CommandFailed(write@Tcp.Write(bytes, ack)) = log.error(sfailed to write ${ack}) // perpetually retry ... does it ever correct itself? import context.dispatcher context.system.scheduler.scheduleOnce(1 second, connection, write) case c: Connected = connection = sender() connection ! Register(self) log.info(sending) // works //connection ! Tcp.Write(ByteString(A * 3), NoAck(Big thing)) //connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) // never recovers connection ! Tcp.Write(ByteString(A * 30), NoAck(Big thing)) connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack) case Ack = system.shutdown() case _: ConnectionClosed = system.shutdown() case msg = // WORKAROUND https://github.com/akka/akka/issues/17898 // (can't use LoggingReceive) log.info(sgot a ${msg.getClass.getName}) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: How can I extract Future[T] value in unmarshaller without blocking ?
Have you looked at onComplete / onSuccess directives that help processing futures? On Sunday, July 5, 2015 at 11:12:29 PM UTC+2, Jack Daniels wrote: I'm using akka-http 1.0.-RC4. How can I extract Future[T] value in unmarshaller without blocking ? Details are here http://stackoverflow.com/questions/31223016/why-akka-http-unmarshaler-returns-futuret-instead-of-t but in short here is what I want to do. I extended ScalaXmlSupport you created to support unmarshalling http requests using Unmarshaller[NodeSeq, T] unmarshallers by adding this implicit def xmlUnmarshallerConverter[T](marshaller: Unmarshaller[NodeSeq, T])(implicit mat: Materializer): FromEntityUnmarshaller[T] = xmlUnmarshaller(marshaller, mat) implicit def xmlUnmarshaller[T](implicit marshaller: Unmarshaller[NodeSeq, T], mat: Materializer): FromEntityUnmarshaller[T] = { implicit val exec = mat.executionContext defaultNodeSeqUnmarshaller.map(v = { val future = marshaller(v)(mat.executionContext) val result = Await.result(future, Duration(10, TimeUnit.SECONDS)) result }) } The only problem is that prototype of Unmarshaller.apply is this def apply[A, B](f: ExecutionContext ⇒ A ⇒ Future[B]): Unmarshaller[A, B] It requires me to returns Future which I do by creating unmarshallers of model classes like implicit val articleBodyUnmarshaller = Unmarshaller[NodeSeq, ArticleBody] { xml = Future(ArticleBody(xml.toString())) } But I can't use it in xmlUnmarshaller : FromEntityUnmarshaller[T] because it requires me to return concrete object which forces me to do ugly blocking How should I write it properly ? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka HTTP Streams vs Pulse Queues
Hi Eric, I'm not sure I can answer your very broad question, would you mind/be able to formulate a couple of more specific ones? On Fri, Jul 3, 2015 at 6:34 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: I have prototyped an application that basically multiplexes messages, assembles them into blocks, and then segments them. The application tries to do as much concurrently as possible. I have used something I call a 'pulse queue' which is based on a non-blocking concurrent queue. http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html Basically, I create a separate future to handle the processing of each message, and the futures complete their work by adding their results to the non-blocking queue. The pulse part is that periodically (i.e. each second) the queue is drained to create a block or segment. There are other reasons to drain a queue, i.e. size, but primarily it is the pulse that keeps data moving through the system. I have found this to be simple to code, and effective at handling large numbers of Futures concurrently. As I am trying to understand Akka Streams, and Akka HTTP, I am wondering if Streams would be a better or equivalent solution, but I do not really understand how streams work under the hood to answer my curiosity. For example, in the Streams environment is it possible to create a separate future to handle each message, the way the pulse queue does? My sense is that each element in the stream is an Actor under the hood, so that the messages (while non-blocking) would get serialized. Cheers, Eric -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: How to create dispatcher's threads set to low priority?
Please see the answer http://scalaakka.blogspot.com/2015/07/akka-dispatcher-with-low-priority.html . W dniu czwartek, 2 lipca 2015 15:45:10 UTC+2 użytkownik kermitas napisał: Hi group, what is the best way to assign dedicated ThreadFactory to dispatcher? My goal is to have all threads in the given dispatcher set to low priority ( thread.setPriority(Thread.MIN_PRIORITY)). Thanks, Artur. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Stream Source as ActorPublisher
Hi Maxim, could you please minimize the problematic code into a self-contained, no extraneous pieces, snippet? On Mon, Jul 6, 2015 at 9:55 PM, Maxim Korolyov korolyov.ma...@gmail.com wrote: Hey, i was playing with akka stream and akka http and faced and issue, when source created as ActorPublisher doesn't publish any data to source. i have verified that `onNext` method of ActorPublisher is invoking and that the source is empty by adding a logging flow like this source.via(Flow[String]{s = println(s);s} Have any one faced this? there is a link to the code http://github.com/mkorolyov/akka-streams. In two words it is an websocked endpoint which expects in request message json string {isoCode:USD} and serves realtime currency rates updates + optionally history of rates by currency code. I am creating an ` actualSource` from the ActorPublisher and concatenating it with history source from DB in CurrecnyService.scala. Concatenated source is transferred as chucked message to websocket channel. Appreciate for any advise. Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka Stream Source as ActorPublisher
Hey, i was playing with akka stream and akka http and faced and issue, when source created as ActorPublisher doesn't publish any data to source. i have verified that `onNext` method of ActorPublisher is invoking and that the source is empty by adding a logging flow like this source.via(Flow[String]{s = println(s);s} Have any one faced this? there is a link to the code http://github.com/mkorolyov/akka-streams. In two words it is an websocked endpoint which expects in request message json string {isoCode:USD} and serves realtime currency rates updates + optionally history of rates by currency code. I am creating an `actualSource` from the ActorPublisher and concatenating it with history source from DB in CurrecnyService.scala. Concatenated source is transferred as chucked message to websocket channel. Appreciate for any advise. Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Lost actor communication
I am running a 24 node cluster that is roughly split into two roles: frontend and backend. There is a streamer actor on the frontend node talking to a tracker actor on the backend node. There can be many streamer actors on several frontend nodes talking to one tracker actor. It would seem that at some point the streamer actor on a frontend node stop being able to communicate with the tracker actor. It would seem that communicate between the frontend node and the backend node has been lost, but the backend node can still receive messages from the frontend. I say this because the streamer was able to send a poison pill to the tracker, which successfully killed the actor, but the streamer wasn't informed about the termination. I see no indication that a node has fallen from the cluster or is having problems communicating (I have logging set to INFO). Is there anything I can do to get a better idea of what is happening? Thanks, Ben -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Production Systems Using DistributedPubSub?
what problem are you trying to solve with DistributedPubSub? if you are not tolerant to losing some messages, then DistributedPubSub might not be appropriate in your scenario. i wouldn't depend on always being able to gracefully leave the cluster in production! node failures and network partitions are a fact of life. -Michael On 07/06/15 06:08, James Carman wrote: We are considering using the DistributedPubSub support in a production system. We would like to be able to upgrade this system on the fly. I am trying to get some advice on that sort of setup. What we've seen is that when a node dies, we definitely have missed messages. However, when I issue the leave command (via JMX), things appear to clean up gracefully. Obviously, we would use the leave command in a production upgrade scenario, but is there anything we can do (other than setting up the at-least-once-delivery stuff) to get the cluster to more gracefully recover from a node failure? I've been playing around with this stuff using this very simple project on github: https://github.com/jwcarman/akka-cluster-sandbox Basically, I've set it up so that there are three seed nodes, which I start up together. Then, I fire up the client node, which basically just sends strings to one of the seed nodes using a DistributedPubSubMediator.Send message. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com mailto:akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com mailto:akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Production Systems Using DistributedPubSub?
I'm using DPS as a routing engine, basically. We will have multiple nodes of a particular type connecting into the system which are basically different applications that have different needs. These different applications should only consume a message one time, but it doesn't matter which of their nodes gets it (load balancing or competing consumers). As far as requiring the cluster to stay in-tact, I'm not really concerned about that so much and that's to be expected that things will happen (let it fail right), but what I do want to be able to do is, in a controlled fashion, upgrade a running system to a new version by doing some sort of rolling restart approach, or the like. That's what I'm trying to understand is how do folks do that? Sure, things will happen when things don't go as planned, but I want to at least come up with the as planned situation first before we worry about the failure scenarios. On Monday, July 6, 2015 at 1:04:49 PM UTC-4, Michael Frank wrote: what problem are you trying to solve with DistributedPubSub? if you are not tolerant to losing some messages, then DistributedPubSub might not be appropriate in your scenario. i wouldn't depend on always being able to gracefully leave the cluster in production! node failures and network partitions are a fact of life. -Michael On 07/06/15 06:08, James Carman wrote: We are considering using the DistributedPubSub support in a production system. We would like to be able to upgrade this system on the fly. I am trying to get some advice on that sort of setup. What we've seen is that when a node dies, we definitely have missed messages. However, when I issue the leave command (via JMX), things appear to clean up gracefully. Obviously, we would use the leave command in a production upgrade scenario, but is there anything we can do (other than setting up the at-least-once-delivery stuff) to get the cluster to more gracefully recover from a node failure? I've been playing around with this stuff using this very simple project on github: https://github.com/jwcarman/akka-cluster-sandbox Basically, I've set it up so that there are three seed nodes, which I start up together. Then, I fire up the client node, which basically just sends strings to one of the seed nodes using a DistributedPubSubMediator.Send message. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka and Node.js
On Sat, Jul 4, 2015 at 2:20 AM, Andi Nugroho Dirgantara andi.n.dirgant...@gmail.com wrote: I want my Node.js apps also works under Akka system maybe using node-java. Is it good practice? If it's not, is there any solution that Node.js can communicate with Akka? If you particularly want to run under Node.js, you may want to look into Scala.js, which outputs JavaScript instead of JVM, and thus runs natively under Node. There are folks in that community who are working on porting Akka to Scala.js https://github.com/unicredit/akka.js. (Akka.js is still under development, but I gather they've made good progress.) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.