This is the output generated by the test wich is as expected.
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Authenticating
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribing
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribed
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Heartbeat
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - MarketChange
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Heartbeat
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - MarketChange

However when running this live this is the output:
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Authenticating
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribing
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
20:45:53 [] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribed

On Tuesday, May 30, 2017 at 8:41:44 PM UTC+2, Henrik Larsson wrote:
> *Im trying to construct a GaphStage that will be used when connecting to a 
> TCP socket. The following protocol is used when connecting:
> 1. Materialize TCP flow to create an outgoingConnection
> 2. Socket return Connect message
> 3. If Connect message OK send AuthenticationMessage
> 4. If OK response on AuthenticationMessage send SubscriptionMessage
> 5. If Subscription OK forward all incomming messages that are not Heatbeat
> This is the code for the GraphStage.*
> class SubscriptionGraphStage(token: SessionToken)
>   extends GraphStage[BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent]]
>     with LazyLogging {
>   import com.bfg.infrastructure.betfair.temp.TcpProxyState._
>   val bytesIn: Inlet[BetfairRequest] = Inlet("")
>   val bytesOut: Outlet[BetfairEvent] = Outlet("OutgoingTCP.out")
>   val sslIn: Inlet[BetfairEvent] = Inlet("")
>   val sslOut: Outlet[BetfairRequest] = Outlet("OutgoingSSL.out")
>   override def shape: BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent] = BidiShape.apply(bytesIn, sslOut, sslIn, bytesOut)
>   val authMessage = RequestAuthentication(session = token.sessionToken, id = 
> 1, appKey = "tDwhr80fJKsOW725")
>   val marketSubscription: BetfairRequest = RequestMarketSubscription(
>     id=123,
>     marketFilter = MarketSubscriptionMarketFilter(
>       eventTypeIds = List("7"),
>       marketTypes = List("WIN"),
>       countryCodes = List("GB")
>     ),
>     marketDataFilter = MarketSubscriptionMarketDataFilter(
>       fields = List("EX_ALL_OFFERS","EX_MARKET_DEF"),
>       ladderLevels = Some(3)
>     )
>   )
>   def connectionValid(event: BetfairEvent): Boolean = event match {
>     case m: ConnectionMessage => true
>     case _ => false
>   }
>   def authenticationValid(event: BetfairEvent): Boolean = event match {
>     //todo simplifed need to control id
>     case m: StatusMessage if m.statusCode == "SUCCESS" => true
>     case _ => false
>   }
>   def subscriptionValid(event: BetfairEvent): Boolean = event match {
>     //todo simplifed need to control id
>     case m: StatusMessage if m.statusCode == "SUCCESS" => true
>     case _ => false
>   }
>   def heatbeatValid(event: BetfairEvent): Boolean = event match {
>     case m: MarketChangeMessage if m.ct == Some("HEARTBEAT") => true
>     case _ => false
>   }
>   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
> = new GraphStageLogic(shape) {
>     private var state: State = Connecting
>     // I1
>     setHandler(sslIn, new InHandler {
>       override def onPush() = {
>         state match {
>           case Connecting =>
>             val event = grab(sslIn)
>             if (connectionValid(event)) {
>               state = Authenticating
>               push(sslOut, authMessage)
>             } else {
>               failStage(new BetfairConnectionFailedException(s"Connection to 
> Betfair failed"))
>             }
>           case Authenticating =>
>             val event = grab(sslIn)
>             if (authenticationValid(event)) {
>               state = Subscribing
>               push(sslOut, marketSubscription)
>             } else {
>               failStage(new 
> BetfairAuthenticationFailedException(s"Authentication with Betfair failed"))
>             }
>           case Subscribing =>
>             val event = grab(sslIn)
>             if (subscriptionValid(event)) {
>               state = Subscribed
>             } else {
>               failStage(new BetfairSubscriptionFailedException(s"Subscription 
> with Betfair failed"))
>             }
>           case Subscribed =>
>             val event = grab(sslIn)
>             if (heatbeatValid(event)) {
>     "Heartbeat")
>               pull(sslIn)
>             } else {
>               push(bytesOut, event)
>     "MarketChange")
>             }
>         }
>       }
>       override def onUpstreamFinish(): Unit = complete(bytesOut)
>     })
>     // I2
>     setHandler(bytesIn, new InHandler {
>       override def onPush() = {
>       }
>       override def onUpstreamFinish(): Unit = complete(sslOut)
>     })
>     // Called when transport pull for data
>     // O1
>     setHandler(bytesOut, new OutHandler {
>       override def onPull() = state match {
>         case Connecting =>
>         case Authenticating =>
>         case Subscribing =>
>         case Subscribed => pull(sslIn)
>       }
>       override def onDownstreamFinish(): Unit = cancel(sslIn)
>     })
>     // O2
>     setHandler(sslOut, new OutHandler {
>       override def onPull() = state match {
>         case Connecting => pull(sslIn)
>         case Authenticating => pull(sslIn)
>         case Subscribing => pull(sslIn)
>         case _ =>
>       }
>       override def onDownstreamFinish(): Unit = cancel(bytesIn)
>     })
>   }
> }
> This code works as expected in the following test:
> class SubscriptionGraphStageTest(_system: ActorSystem)
>   extends TestKit(_system)
>     with FlatSpecLike with Matchers
>     with BeforeAndAfterAll
>     with ScalaFutures
> {
>   implicit val materializer = 
> ActorMaterializer(ActorMaterializerSettings(_system).withFuzzing(true))
>   def this() = this(ActorSystem())
>   "SubscriptionGraphStage" should "handle a successful subscription" in new 
> Context {
>     source.sendNext(connectionOK)
>     val authenticationRequest = sink.requestNext()
>     source.sendNext(authenticationOK)
>     val subscriptionRequest = sink.requestNext()
>     source.sendNext(subscriptionOK)
>     sink.request(2)
>     source.sendNext(heartbeat)
>     source.sendNext(marketChange)
>     source.sendNext(heartbeat)
>     source.sendNext(marketChange)
>     fromBetfairProbe.requestNext(marketChange)
>     fromBetfairProbe.requestNext(marketChange)
>   }
>   it should "handle failures in subscription" in {}
>   override def afterAll(): Unit = {
>     materializer.shutdown()
>     _system.terminate()
>   }
>   trait Context {
>     // After materialization of TLS we get connection message
>     val connectionOK = ConnectionMessage(op = "connection", connectionId = 
> "050-41605-1391626")
>     // After authentication message we get
>     val authenticationOK = StatusMessage(op = "status", id = Some(1), 
> statusCode = "SUCCESS", connectionClosed = false, errorCode = None, 
> errorMessage = None, connectionId = None)
>     // After MarketSubscription
>     val subscriptionOK = StatusMessage(op = "status", id = Some(123), 
> statusCode = "SUCCESS", connectionClosed = false, errorCode = None, 
> errorMessage = None, connectionId = None)
>     // After connection we get heartbeat
>     val heartbeat = MarketChangeMessage(op = "mcm", id = 123, clk = 
> "AAAAAAAA", pt = 1495773941868L, ct = Some("HEARTBEAT"), heartbeatMs = None, 
> initialClk = None, mc = None, conflateMs = None, segmentType = None)
>     // Finally we get MarketChangeMessage
>     val marketChange = MarketChangeMessage(op = "mcm", id = 123, clk = 
> "AAAAAAAA", pt = 1495773941868L, ct = Some("BOOM"), heartbeatMs = None, 
> initialClk = None, mc = None, conflateMs = None, segmentType = None)
>     val proxyStage = new 
> SubscriptionGraphStage(SessionToken("fasdfasdf","SUCCESS"))
>     val proxyFlow: BidiFlow[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent, NotUsed] = BidiFlow.fromGraph(proxyStage)
>     val fromBetfairProbe = TestSubscriber.probe[BetfairEvent]()
>     val toBetfairProbe = TestPublisher.probe[BetfairRequest]()
>     val transportFlow = Flow.fromSinkAndSource(
>       Sink.fromSubscriber(fromBetfairProbe),
>       Source.fromPublisher(toBetfairProbe))
>     val flowUnderTest = proxyFlow.reversed.join(transportFlow)
>     val (source, sink) = TestSource.probe[BetfairEvent]
>       .via(flowUnderTest)
>       .toMat(TestSink.probe[BetfairRequest])(Keep.both)
>       .run()
>   }
> }
> However when I run this on an actual TCP connection I get to the Subscribed 
> stage but then newer any Heartbeat or 
> MarketChange arrive. This drives me crazy since I have no idea about how to 
> debug or trouble shoot this? My guess is that
> im missing some pull etc but BidiShape is really messy to understand. Any 
> help on resolving this would help! If you need
> more info or code just ask but I wanted to start with the minimal.
> All this code is heavily inspired by this blog article: BLOG 
> <>

