Dear all,

I had a few questions for this list last week regarding an unrecoverable 
error condition that I was seeing in Wandoulabs WebSockets.

I have now isolated the problem to standard Scala 2.11.7 / Akka 2.3.11 
Tcp.Write with the minimal example at the bottom of this email.

It seems that sending a ByteString of a moderate size can basically nuke 
the network connection.


I am very concerned that such unrecoverable errors are possible 
(reconnecting would potentially allow sending the failed message, but let's 
not consider that a solution).

What is even more concerning is that I have seen related problems in my 
integration tests, where I am using Acking with backpressure everywhere, 
but I have been unable to get a reliable reproduction of the problem. Using 
Acking seems to mitigate the problem somewhat, but obviously not enough.

Can somebody please have a look at this and let me know if it is a bug or 
if there is some part of the Tcp.Write spec that I failed to grok. Also, 
confirming if the problem exists on some network other than mine would be a 
good data point. My corporate environment uses PEAP


Best regards,
Sam


package testing

import akka.actor._
import akka.event.LoggingReceive
import akka.io.{ IO, Tcp }
import akka.util.ByteString
import java.net.InetSocketAddress
import java.util.UUID
import concurrent.duration._

/**
 * This is a test of Akka IO to see if the WebSocket behaviour
 * described in Buggy is a TCP problem or limited to the WebSocket
 * implementation.
 *
 * Run a blackhole on the target machine, e.g.
 *
 *   nc -k -l 2222 >/dev/null
 *
 * For a single session, to confirm transmission of payloads:
 *
 *   nc -l 2222 > blackhole
 *
 * run-main testing.BuggyTcp
 *
 */
object BuggyTcp extends App {
  implicit val system = ActorSystem()

  val remote = new InetSocketAddress("remote-hostname-here", 2222)

  system.actorOf(Props(classOf[BuggyTcp], remote), "client")
}

class BuggyTcp(remote: InetSocketAddress) extends Actor with ActorLogging {

  import Tcp._
  import context.system

  override def preStart(): Unit = {
    IO(Tcp) ! Connect(remote)
  }

  var connection: ActorRef = _

  object Ack extends Tcp.Event with spray.io.Droppable {
    override def toString = "Ack"
  }

  def receive = {
    case CommandFailed(write@Tcp.Write(bytes, ack)) =>
      log.error(s"failed to write ${ack}")

      // perpetually retry ... does it ever correct itself?
      import context.dispatcher
      context.system.scheduler.scheduleOnce(1 second, connection, write)

    case c: Connected =>
      connection = sender()
      connection ! Register(self)
      log.info("sending")

      // works
      //connection ! Tcp.Write(ByteString("A" * 30000), NoAck("Big thing"))
      //connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack)

      // never recovers
      connection ! Tcp.Write(ByteString("A" * 300000), NoAck("Big thing"))
      connection ! Tcp.Write(ByteString(UUID.randomUUID.toString), Ack)

    case Ack =>
      system.shutdown()
    case _: ConnectionClosed =>
      system.shutdown()

    case msg =>
      // WORKAROUND https://github.com/akka/akka/issues/17898
      // (can't use LoggingReceive)
      log.info(s"got a ${msg.getClass.getName}")

  }

}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to