[akka-user] Re: TCP Handshaking custom stage drives me crazy

2017-06-02 Thread Henrik Larsson
Thanks for the help, yes that did the trick to add one more pull for sslIn. 
The reason bytesIn is empty is because I really dont need a bidishape. I 
need this logic encapsulated in something with one input and two outputs 
because after this handshaking is done there is no need to send any more 
messages to the TCP socket. However since I found your example the only 
example that we close to what i needed and you use bidishape i stick with 
this solution until i understad more about Akka Stream.

On Friday, June 2, 2017 at 1:40:42 PM UTC+2, Michał Sitko wrote:
>
> Hey Larsson - based on debug messages when running live I would guess that 
> you miss `pull(sslIn)` somewhere. I have a hypothesis for what's going on. 
> Let's take a look at:
>
> ```
> setHandler(bytesOut, new OutHandler {
>   override def onPull() = state match {
> case Connecting =>
> case Authenticating =>
> case Subscribing =>
> case Subscribed => pull(sslIn)
>   }
> })
> ```
>
> You're pulling just when in `Subscribed` state. That means that if 
> `bytesOut.onPull` was called before Stage went into `Subscribed` state then 
> `pull(sslIn)` will not get called. Therefore we need to ensure that pull 
> will be called even in that case. You can do this by e.g. adding:
>
> ```scala
> if (subscriptionValid(event)) {
>   state = Subscribed
>   logger.info(state.toString)
>   if (isAvailable(sslIn)) {
> pull(sslIn)
>   }
> }
> ```
>
> in `onPush` handler for `sslIn`.
>
> To debug it I would add `println`s in all places we do `pull(sslIn)` just 
> to be sure they're really are executed. In extreme case you can add println 
> before all calls to `pull` and `push` - I know there will be a lot of 
> output but will give you insight into what's going on.
>
> BTW, I don't understand this one:
>
> ```scala
> setHandler(bytesIn, new InHandler {
>   override def onPush() = {
>   }
> }
> ```
>
> Also, find it strange that your test works. You are not sending anything 
> with `toBetfairProbe` (which simulates input port from TCP as far as I 
> understand). I have not idea, maybe you pasted wrong code? 
>
> Hope it will help, let us know in case of further troubles.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: TCP Handshaking custom stage drives me crazy

2017-06-02 Thread Michał Sitko
I spotted a mistake in the second snippet from my previous post. What I 
meant is the following (sorry for another post but cannot find "edit" 
option here):


if (subscriptionValid(event)) {
  state = Subscribed
  logger.info(state.toString)
  if (isAvailable(bytesOut)) {
pull(sslIn)
  }
}


On Friday, June 2, 2017 at 1:40:42 PM UTC+2, Michał Sitko wrote:
>
> Hey Larsson - based on debug messages when running live I would guess that 
> you miss `pull(sslIn)` somewhere. I have a hypothesis for what's going on. 
> Let's take a look at:
>
> ```
> setHandler(bytesOut, new OutHandler {
>   override def onPull() = state match {
> case Connecting =>
> case Authenticating =>
> case Subscribing =>
> case Subscribed => pull(sslIn)
>   }
> })
> ```
>
> You're pulling just when in `Subscribed` state. That means that if 
> `bytesOut.onPull` was called before Stage went into `Subscribed` state then 
> `pull(sslIn)` will not get called. Therefore we need to ensure that pull 
> will be called even in that case. You can do this by e.g. adding:
>
> ```scala
> if (subscriptionValid(event)) {
>   state = Subscribed
>   logger.info(state.toString)
>   if (isAvailable(sslIn)) {
> pull(sslIn)
>   }
> }
> ```
>
> in `onPush` handler for `sslIn`.
>
> To debug it I would add `println`s in all places we do `pull(sslIn)` just 
> to be sure they're really are executed. In extreme case you can add println 
> before all calls to `pull` and `push` - I know there will be a lot of 
> output but will give you insight into what's going on.
>
> BTW, I don't understand this one:
>
> ```scala
> setHandler(bytesIn, new InHandler {
>   override def onPush() = {
>   }
> }
> ```
>
> Also, find it strange that your test works. You are not sending anything 
> with `toBetfairProbe` (which simulates input port from TCP as far as I 
> understand). I have not idea, maybe you pasted wrong code? 
>
> Hope it will help, let us know in case of further troubles.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: TCP Handshaking custom stage drives me crazy

2017-06-02 Thread Michał Sitko
Hey Larsson - based on debug messages when running live I would guess that 
you miss `pull(sslIn)` somewhere. I have a hypothesis for what's going on. 
Let's take a look at:

```
setHandler(bytesOut, new OutHandler {
  override def onPull() = state match {
case Connecting =>
case Authenticating =>
case Subscribing =>
case Subscribed => pull(sslIn)
  }
})
```

You're pulling just when in `Subscribed` state. That means that if 
`bytesOut.onPull` was called before Stage went into `Subscribed` state then 
`pull(sslIn)` will not get called. Therefore we need to ensure that pull 
will be called even in that case. You can do this by e.g. adding:

```scala
if (subscriptionValid(event)) {
  state = Subscribed
  logger.info(state.toString)
  if (isAvailable(sslIn)) {
pull(sslIn)
  }
}
```

in `onPush` handler for `sslIn`.

To debug it I would add `println`s in all places we do `pull(sslIn)` just 
to be sure they're really are executed. In extreme case you can add println 
before all calls to `pull` and `push` - I know there will be a lot of 
output but will give you insight into what's going on.

BTW, I don't understand this one:

```scala
setHandler(bytesIn, new InHandler {
  override def onPush() = {
  }
}
```

Also, find it strange that your test works. You are not sending anything 
with `toBetfairProbe` (which simulates input port from TCP as far as I 
understand). I have not idea, maybe you pasted wrong code? 

Hope it will help, let us know in case of further troubles.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: TCP Handshaking custom stage drives me crazy

2017-05-30 Thread Henrik Larsson
This is the output generated by the test wich is as expected.
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
ConnectionMessage(connection,050-250517141605-1391626)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Authenticating
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(1),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribing
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(123),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribed
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(HEARTBEAT),,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Heartbeat
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(BOOM),,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - MarketChange
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(HEARTBEAT),,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Heartbeat
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
MarketChangeMessage(mcm,123,Some(BOOM),,None,1495773941868,None,None,None,None)
20:45:53 [default-akka.actor.default-dispatcher-2] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - MarketChange

However when running this live this is the output:
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
ConnectionMessage(connection,050-250517141605-1391626)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Authenticating
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(1),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribing
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - 
StatusMessage(status,Some(123),None,None,None,false,SUCCESS)
20:45:53 [default-akka.actor.default-dispatcher-4] INFO 
 c.b.i.b.temp.SubscriptionGraphStage - Subscribed

On Tuesday, May 30, 2017 at 8:41:44 PM UTC+2, Henrik Larsson wrote:
>
>
> *Im trying to construct a GaphStage that will be used when connecting to a 
> TCP socket. The following protocol is used when connecting:
> 1. Materialize TCP flow to create an outgoingConnection
> 2. Socket return Connect message
> 3. If Connect message OK send AuthenticationMessage
> 4. If OK response on AuthenticationMessage send SubscriptionMessage
> 5. If Subscription OK forward all incomming messages that are not Heatbeat
>
> This is the code for the GraphStage.*
>
>
>
> class SubscriptionGraphStage(token: SessionToken)
>   extends GraphStage[BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent]]
> with LazyLogging {
>
>   import com.bfg.infrastructure.betfair.temp.TcpProxyState._
>
>   val bytesIn: Inlet[BetfairRequest] = Inlet("OutgoingTCP.in")
>
>   val bytesOut: Outlet[BetfairEvent] = Outlet("OutgoingTCP.out")
>   val sslIn: Inlet[BetfairEvent] = Inlet("OutgoingSSL.in")
>   val sslOut: Outlet[BetfairRequest] = Outlet("OutgoingSSL.out")
>
>   override def shape: BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
> BetfairEvent] = BidiShape.apply(bytesIn, sslOut, sslIn, bytesOut)
>
>   val authMessage = RequestAuthentication(session = token.sessionToken, id = 
> 1, appKey = "tDwhr80fJKsOW725")
>   val marketSubscription: BetfairRequest = RequestMarketSubscription(
> id=123,
> marketFilter = MarketSubscriptionMarketFilter(
>   eventTypeIds = List("7"),
>   marketTypes = List("WIN"),
>   countryCodes = List("GB")
> ),
> marketDataFilter = MarketSubscriptionMarketDataFilter(
>   fields = List("EX_ALL_OFFERS","EX_MARKET_DEF"),
>   ladderLevels = Some(3)
> )
>   )
>   def connectionValid(event: BetfairEvent): Boolean = event match {
> case m: ConnectionMessage => true
> case _ => false
>   }
>
>   def authenticationValid(event: BetfairEvent): Boolean = event match {
> //todo simplifed need to control id
> case m: StatusMessage if m.statusCode == "SUCCESS" => true
> case _ => false
>   }
>   def subscriptionValid(event: B