I have a web service, implemented in akka-http, to which I’ve added web socket 
support.  I have an akka-http client that is able to connect to the service and 
receive a single message, thereupon the web socket is, for some reason, 
disconnected.  I do not understand why it becomes disconnected. There is no 
intentional code on the server side that should disconnect the web socket.  On 
the client, I’m using virtually identical to the code found here:

http://doc.akka.io/docs/akka/2.4.7/scala/http/client-side/websocket-support.html#websocketclientflow
 
<http://doc.akka.io/docs/akka/2.4.7/scala/http/client-side/websocket-support.html#websocketclientflow>

I’m finding that this code:

val connected = upgradeResponse.flatMap { upgrade =>
  if (upgrade.response.status == StatusCodes.OK) {
    Future.successful(Done)
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}

upon connection, raises the above RuntimeException.  The value of 
upgrade.response.status is "101 Switching Protocols”. I do receive one message 
from server-to-client and displayed by the client before the “closed” Future is 
completed:

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right) // keep the materialized 
Future[WebSocketUpgradeResponse]
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()
When the closed Future completes, this code is executed:

closed.foreach(_ => println("closed"))
and I see this “closed” message displayed.  However, before it is displayed, I 
do get a message sent by the service to the client.

My questions:  1) isn’t the “101 Switching Protocols” status expected?  Why is 
that not handled in the if clause of the upgrade response handling logic?  I 
did change the conditional to:

if (upgrade.response.status == StatusCodes.OK || upgrade.response.status == 
StatusCodes.SwitchingProtocols)
But while that prevents the runtime exception, the connection still 
automatically closes after receiving the first message.

On the server side, I have logic that looks like this:

class WebSocketConnection(experimentInstanceId: String, httpActorSystem: 
ActorSystem, clusterActorSystem: ActorSystem) {
  
  private[this] val experimentInstanceRegion = 
ClusterSharding(clusterActorSystem).shardRegion(ExperimentInstance.shardName)

  def websocketFlow: Flow[Message, Message, _] =
    Flow.fromGraph(
      GraphDSL.create(Source.actorRef[WsMessage](bufferSize = 5, 
OverflowStrategy.fail)) { implicit builder =>
        wsMessageSource => //source provided as argument

          // flow used as input from web socket
          val fromWebsocket = builder.add(
            Flow[Message].collect {
              case TextMessage.Strict(txt) =>
                println(s"txt=$txt")
                WsIncomingMessage(experimentInstanceId, txt)
            })

          // flow used as output, it returns Messages
          val backToWebsocket = builder.add(
            Flow[WsMessage].map {
              case WsMessage(author, text) => TextMessage(s"[$author]: $text")
            }
          )

          // send messages to the actor, if send also 
Disconnected(experimentInstanceId) before stream completes.
          val actorSink = 
Sink.actorRef[WebSocketEvent](experimentInstanceRegion, 
WsDisconnected(experimentInstanceId))

          // merges both pipes
          val merge = builder.add(Merge[WebSocketEvent](2))

          val actorAsSource = builder.materializedValue.map(actor => 
WsConnected(experimentInstanceId, actor))

          fromWebsocket ~> merge.in(0)

          actorAsSource ~> merge.in(1)

          merge ~> actorSink

          wsMessageSource ~> backToWebsocket

          // expose ports

          FlowShape(fromWebsocket.in, backToWebsocket.outlet)
      })

  def sendMessage(message: WsMessage): Unit = experimentInstanceRegion ! message
}
In the actor associated with the experimentInstanceRegion (e.g. a cluster 
sharing region), I see log messages indicating a WsConnected message is 
received.  And then shortly after, I see that the WsDisconnected message is 
received.  

I haven’t touched the web socket support logic in quite some time.  However, 
this all used to work — in other words, the client would stay active and 
display messages received from the server, and the server would continue 
sending messages to the client over the web socket connection until the client 
disconnected (on purpose).  This was all working in akka 2.3 and may have 
broken when I upgraded to the various 2.4 releases and is still broken using 
2.4.7.  

Any suggestions on how to debug this?  Any reason for the auto-disconnection?  
And what about the 101 Switching Protocols handling (mentioned above)?

Thanks. — Eric

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to