This is the output generated by the test wich is as expected.
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
ConnectionMessage(connection,050-250517141605-1391626)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Authenticating
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(1),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribing
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(123),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribed
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(HEARTBEAT),AAAAAAAA,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Heartbeat
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(BOOM),AAAAAAAA,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - MarketChange
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(HEARTBEAT),AAAAAAAA,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Heartbeat
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(BOOM),AAAAAAAA,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - MarketChange

However when running this live this is the output:
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
ConnectionMessage(connection,050-250517141605-1391626)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Authenticating
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(1),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribing
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(123),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribed

On Tuesday, May 30, 2017 at 8:41:44 PM UTC+2, Henrik Larsson wrote:
>
>
> *Im trying to construct a GaphStage that will be used when connecting to a 
> TCP socket. The following protocol is used when connecting:
> 1. Materialize TCP flow to create an outgoingConnection
> 2. Socket return Connect message
> 3. If Connect message OK send AuthenticationMessage
> 4. If OK response on AuthenticationMessage send SubscriptionMessage
> 5. If Subscription OK forward all incomming messages that are not Heatbeat
>
> This is the code for the GraphStage.*
>
>
>
> class SubscriptionGraphStage(token: SessionToken)
>   extends GraphStage[BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent]]
>     with LazyLogging {
>
>   import com.bfg.infrastructure.betfair.temp.TcpProxyState._
>
>   val bytesIn: Inlet[BetfairRequest] = Inlet("OutgoingTCP.in")
>
>   val bytesOut: Outlet[BetfairEvent] = Outlet("OutgoingTCP.out")
>   val sslIn: Inlet[BetfairEvent] = Inlet("OutgoingSSL.in")
>   val sslOut: Outlet[BetfairRequest] = Outlet("OutgoingSSL.out")
>
>   override def shape: BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent] = BidiShape.apply(bytesIn, sslOut, sslIn, bytesOut)
>
>   val authMessage = RequestAuthentication(session = token.sessionToken, id = 
> 1, appKey = "tDwhr80fJKsOW725")
>   val marketSubscription: BetfairRequest = RequestMarketSubscription(
>     id=123,
>     marketFilter = MarketSubscriptionMarketFilter(
>       eventTypeIds = List("7"),
>       marketTypes = List("WIN"),
>       countryCodes = List("GB")
>     ),
>     marketDataFilter = MarketSubscriptionMarketDataFilter(
>       fields = List("EX_ALL_OFFERS","EX_MARKET_DEF"),
>       ladderLevels = Some(3)
>     )
>   )
>   def connectionValid(event: BetfairEvent): Boolean = event match {
>     case m: ConnectionMessage => true
>     case _ => false
>   }
>
>   def authenticationValid(event: BetfairEvent): Boolean = event match {
>     //todo simplifed need to control id
>     case m: StatusMessage if m.statusCode == "SUCCESS" => true
>     case _ => false
>   }
>   def subscriptionValid(event: BetfairEvent): Boolean = event match {
>     //todo simplifed need to control id
>     case m: StatusMessage if m.statusCode == "SUCCESS" => true
>     case _ => false
>   }
>
>   def heatbeatValid(event: BetfairEvent): Boolean = event match {
>     case m: MarketChangeMessage if m.ct == Some("HEARTBEAT") => true
>     case _ => false
>   }
>
>   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
> = new GraphStageLogic(shape) {
>     private var state: State = Connecting
>
>     // I1
>     setHandler(sslIn, new InHandler {
>       override def onPush() = {
>         state match {
>           case Connecting =>
>             val event = grab(sslIn)
>             logger.info(event.toString)
>             if (connectionValid(event)) {
>               state = Authenticating
>               logger.info(state.toString)
>               push(sslOut, authMessage)
>             } else {
>               failStage(new BetfairConnectionFailedException(s"Connection to 
> Betfair failed"))
>             }
>           case Authenticating =>
>             val event = grab(sslIn)
>             logger.info(event.toString)
>             if (authenticationValid(event)) {
>               state = Subscribing
>               logger.info(state.toString)
>               push(sslOut, marketSubscription)
>             } else {
>               failStage(new 
> BetfairAuthenticationFailedException(s"Authentication with Betfair failed"))
>             }
>           case Subscribing =>
>             val event = grab(sslIn)
>             logger.info(event.toString)
>             if (subscriptionValid(event)) {
>               state = Subscribed
>               logger.info(state.toString)
>             } else {
>               failStage(new BetfairSubscriptionFailedException(s"Subscription 
> with Betfair failed"))
>             }
>           case Subscribed =>
>             val event = grab(sslIn)
>             logger.info(event.toString)
>             if (heatbeatValid(event)) {
>               logger.info("Heartbeat")
>               pull(sslIn)
>             } else {
>               push(bytesOut, event)
>               logger.info("MarketChange")
>             }
>         }
>       }
>
>       override def onUpstreamFinish(): Unit = complete(bytesOut)
>     })
>
>     // I2
>     setHandler(bytesIn, new InHandler {
>       override def onPush() = {
>       }
>
>       override def onUpstreamFinish(): Unit = complete(sslOut)
>     })
>
>     // Called when transport pull for data
>     // O1
>     setHandler(bytesOut, new OutHandler {
>       override def onPull() = state match {
>         case Connecting =>
>         case Authenticating =>
>         case Subscribing =>
>         case Subscribed => pull(sslIn)
>       }
>
>       override def onDownstreamFinish(): Unit = cancel(sslIn)
>     })
>
>     // O2
>     setHandler(sslOut, new OutHandler {
>       override def onPull() = state match {
>         case Connecting => pull(sslIn)
>         case Authenticating => pull(sslIn)
>         case Subscribing => pull(sslIn)
>         case _ =>
>       }
>
>       override def onDownstreamFinish(): Unit = cancel(bytesIn)
>     })
>
>   }
> }
>
>
> This code works as expected in the following test:
>
>
> class SubscriptionGraphStageTest(_system: ActorSystem)
>   extends TestKit(_system)
>     with FlatSpecLike with Matchers
>     with BeforeAndAfterAll
>     with ScalaFutures
> {
>   implicit val materializer = 
> ActorMaterializer(ActorMaterializerSettings(_system).withFuzzing(true))
>
>   def this() = this(ActorSystem())
>
>   "SubscriptionGraphStage" should "handle a successful subscription" in new 
> Context {
>     source.sendNext(connectionOK)
>     val authenticationRequest = sink.requestNext()
>
>     source.sendNext(authenticationOK)
>     val subscriptionRequest = sink.requestNext()
>
>     source.sendNext(subscriptionOK)
>     sink.request(2)
>
>     source.sendNext(heartbeat)
>     source.sendNext(marketChange)
>     source.sendNext(heartbeat)
>     source.sendNext(marketChange)
>     fromBetfairProbe.requestNext(marketChange)
>     fromBetfairProbe.requestNext(marketChange)
>
>   }
>   it should "handle failures in subscription" in {}
>
>   override def afterAll(): Unit = {
>     materializer.shutdown()
>     _system.terminate()
>   }
>
>   trait Context {
>     // After materialization of TLS we get connection message
>     val connectionOK = ConnectionMessage(op = "connection", connectionId = 
> "050-41605-1391626")
>     // After authentication message we get
>     val authenticationOK = StatusMessage(op = "status", id = Some(1), 
> statusCode = "SUCCESS", connectionClosed = false, errorCode = None, 
> errorMessage = None, connectionId = None)
>     // After MarketSubscription
>     val subscriptionOK = StatusMessage(op = "status", id = Some(123), 
> statusCode = "SUCCESS", connectionClosed = false, errorCode = None, 
> errorMessage = None, connectionId = None)
>     // After connection we get heartbeat
>     val heartbeat = MarketChangeMessage(op = "mcm", id = 123, clk = 
> "AAAAAAAA", pt = 1495773941868L, ct = Some("HEARTBEAT"), heartbeatMs = None, 
> initialClk = None, mc = None, conflateMs = None, segmentType = None)
>     // Finally we get MarketChangeMessage
>     val marketChange = MarketChangeMessage(op = "mcm", id = 123, clk = 
> "AAAAAAAA", pt = 1495773941868L, ct = Some("BOOM"), heartbeatMs = None, 
> initialClk = None, mc = None, conflateMs = None, segmentType = None)
>
>     val proxyStage = new 
> SubscriptionGraphStage(SessionToken("fasdfasdf","SUCCESS"))
>     val proxyFlow: BidiFlow[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent, NotUsed] = BidiFlow.fromGraph(proxyStage)
>
>     val fromBetfairProbe = TestSubscriber.probe[BetfairEvent]()
>     val toBetfairProbe = TestPublisher.probe[BetfairRequest]()
>
>     val transportFlow = Flow.fromSinkAndSource(
>       Sink.fromSubscriber(fromBetfairProbe),
>       Source.fromPublisher(toBetfairProbe))
>
>     val flowUnderTest = proxyFlow.reversed.join(transportFlow)
>
>     val (source, sink) = TestSource.probe[BetfairEvent]
>       .via(flowUnderTest)
>       .toMat(TestSink.probe[BetfairRequest])(Keep.both)
>       .run()
>   }
> }
>
>
> However when I run this on an actual TCP connection I get to the Subscribed 
> stage but then newer any Heartbeat or 
>
> MarketChange arrive. This drives me crazy since I have no idea about how to 
> debug or trouble shoot this? My guess is that
>
> im missing some pull etc but BidiShape is really messy to understand. Any 
> help on resolving this would help! If you need
>
> more info or code just ask but I wanted to start with the minimal.
>
>
> All this code is heavily inspired by this blog article: BLOG 
> <http://blog.scalac.io/2017/04/25/akka-streams-graph-stage.html>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to