[akka-user] Distributed Publish Subscribe in custom GraphStage

2017-06-02 Thread Yi Wang
Hi all, 

Is there any way to combine custom GraphStage with Distributed Publish 
Subscribe ? 

I'm refactoring a ActorPublisher into a GraphStage.

Thanks a lot

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] beginner question on akka streams flow design.

2017-06-02 Thread Mohnish Kodnani
Hi , 
I have the following use case that I want to model using akka streams and I 
am new to akka streams so would like to know how/what is the best way to 
model it. 

I have a kafka consumer which reads from one topic massages the message and 
sends it to a RESTful external service. 
This external restful service takes a token that needs to be sent with 
every message in the header. This token expires every roughly 24 hours.

I was thinking of creating an FSM actor that will act as a Token Manager. 
It will be in Active state where it can send the current token on receiving 
say a GetToken message. Then every 24 hours it receives a scheduled event 
for FetchToken, then it will go into Fetching mode, where it will retrieve 
a new token from another service and in that state it should keep queuing 
the requests for GetToken.

Now, how do i hook this actor (TokenManager) into my stream so that it can 
provide a token for every message from kafka but block the stream until it 
gets a new token when it expires. 
Or if there is another solution to what I am thinking instead of creating a 
separate actor. 

Thanks
Mohnish.


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Remoting - no reconnect possible - name or service not known

2017-06-02 Thread Manfred Bergmann
Hello.

I need some advice.

The following situation (Akka 2.5.2):

I have an ActorSystem on one host (host0) and another ActorSystem on 
another host (host1).

host0 is assumed to always be online and available.
host0 does not know the ip address or hostname of host1 (DNS).
host0 cannot make connection to host1 due to firewall restrictions but it 
uses the passive mode functionality.

host1 knows the hostname of host0.
host1 connects to a specific actor on host0 via actorSelection.

host0 watches the connecting actor of host1 and vise versa.
Initially this works all well.

But after host1 goes down (network cable plugged or whatever) and wants to 
reconnect to host0 via actorSelection it's not possible.
host1 logs: "Error resolving remote Actor: Actor not found for: 
ActorSelection[Anchor(akka.tcp://system@host0:port/), Path(somepath)]
host0 logs: "ReliableDeliverySupervisor - Assiciation with remote system 
[akka.tcp://system@host1:port] has failed, address is now gated for [5000] 
ms. Caused by: [No response from remote for outbound association. Associate 
timed out after [15000 ms]]

host1 retries every so often. from the second attempt on the error message 
on host0 is:
"ReliableDeliverySupervisor - Assiciation with remote system 
[akka.tcp://system@host1:port] has failed, address is now gated for [5000] 
ms. Caused by: [host1: Name or service not known]

host1 can only reconnect, if host0 is re-started.

I'd like to understand what happens here.
Why is host1 unable to reconnect. Or actually, why does host0 not 'allow' 
the reconnection.
Now, I've read about the Lifecycle and Failure recovery model. Can it be 
that the link to host1 is quarantined on host0?
Why the error message "Name or service not known", if previously the 
connection worked?


Regards,
Manfred

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: TCP Handshaking custom stage drives me crazy

2017-06-02 Thread Michał Sitko
Hey Larsson - based on debug messages when running live I would guess that 
you miss `pull(sslIn)` somewhere. I have a hypothesis for what's going on. 
Let's take a look at:

```
setHandler(bytesOut, new OutHandler {
  override def onPull() = state match {
case Connecting =>
case Authenticating =>
case Subscribing =>
case Subscribed => pull(sslIn)
  }
})
```

You're pulling just when in `Subscribed` state. That means that if 
`bytesOut.onPull` was called before Stage went into `Subscribed` state then 
`pull(sslIn)` will not get called. Therefore we need to ensure that pull 
will be called even in that case. You can do this by e.g. adding:

```scala
if (subscriptionValid(event)) {
  state = Subscribed
  logger.info(state.toString)
  if (isAvailable(sslIn)) {
pull(sslIn)
  }
}
```

in `onPush` handler for `sslIn`.

To debug it I would add `println`s in all places we do `pull(sslIn)` just 
to be sure they're really are executed. In extreme case you can add println 
before all calls to `pull` and `push` - I know there will be a lot of 
output but will give you insight into what's going on.

BTW, I don't understand this one:

```scala
setHandler(bytesIn, new InHandler {
  override def onPush() = {
  }
}
```

Also, find it strange that your test works. You are not sending anything 
with `toBetfairProbe` (which simulates input port from TCP as far as I 
understand). I have not idea, maybe you pasted wrong code? 

Hope it will help, let us know in case of further troubles.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: TCP Handshaking custom stage drives me crazy

2017-06-02 Thread Michał Sitko
I spotted a mistake in the second snippet from my previous post. What I 
meant is the following (sorry for another post but cannot find "edit" 
option here):


if (subscriptionValid(event)) {
  state = Subscribed
  logger.info(state.toString)
  if (isAvailable(bytesOut)) {
pull(sslIn)
  }
}


On Friday, June 2, 2017 at 1:40:42 PM UTC+2, Michał Sitko wrote:
>
> Hey Larsson - based on debug messages when running live I would guess that 
> you miss `pull(sslIn)` somewhere. I have a hypothesis for what's going on. 
> Let's take a look at:
>
> ```
> setHandler(bytesOut, new OutHandler {
>   override def onPull() = state match {
> case Connecting =>
> case Authenticating =>
> case Subscribing =>
> case Subscribed => pull(sslIn)
>   }
> })
> ```
>
> You're pulling just when in `Subscribed` state. That means that if 
> `bytesOut.onPull` was called before Stage went into `Subscribed` state then 
> `pull(sslIn)` will not get called. Therefore we need to ensure that pull 
> will be called even in that case. You can do this by e.g. adding:
>
> ```scala
> if (subscriptionValid(event)) {
>   state = Subscribed
>   logger.info(state.toString)
>   if (isAvailable(sslIn)) {
> pull(sslIn)
>   }
> }
> ```
>
> in `onPush` handler for `sslIn`.
>
> To debug it I would add `println`s in all places we do `pull(sslIn)` just 
> to be sure they're really are executed. In extreme case you can add println 
> before all calls to `pull` and `push` - I know there will be a lot of 
> output but will give you insight into what's going on.
>
> BTW, I don't understand this one:
>
> ```scala
> setHandler(bytesIn, new InHandler {
>   override def onPush() = {
>   }
> }
> ```
>
> Also, find it strange that your test works. You are not sending anything 
> with `toBetfairProbe` (which simulates input port from TCP as far as I 
> understand). I have not idea, maybe you pasted wrong code? 
>
> Hope it will help, let us know in case of further troubles.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Processor actor terminated abruptly on HTTPs connections (akka 2.4.12, akka-http 2.4.11)

2017-06-02 Thread Nikolay Kushin
Is there anything new about this issue. I just came across and a bit 
disappointed.

On Tuesday, 8 November 2016 18:07:58 UTC+1, Sami Dalouche wrote:
>
> Also, it looks like that avoiding the use of a connection pool doesn't 
> trigger the issue. The following code works fine, without any error
>
> class StreamOmplusEvents {
>
>   implicit val system = ActorSystem()
>   implicit val materializer = ActorMaterializer()
>   implicit val timeout = Timeout(10.seconds)
>   import system.dispatcher
>
>   val request = HttpRequest(
> uri = "/200"
>   )
>
>   val flow = Http(system).outgoingConnectionHttps("httpstatuses.com")
>
>   val source = Source.single(request)
> .via(flow)
> .map(_.discardEntityBytes())
>
>   val sourceProcessed = source.runWith(Sink.ignore)
>
>   val systemTerminatedFuture = sourceProcessed.flatMap { _ =>
> Http().shutdownAllConnectionPools().flatMap { _ =>
>   materializer.shutdown()
>   system.terminate()
> }
>   }
>
>   Await.result(systemTerminatedFuture, Duration.Inf)
> }
>
>
> On Tuesday, 8 November 2016 11:26:15 UTC-5, Sami Dalouche wrote:
>>
>> *Simplest code to reproduce:*
>>
>>
>> class StreamOmplusEvents {
>>
>>   implicit val system = ActorSystem()
>>   implicit val materializer = ActorMaterializer()
>>   implicit val timeout = Timeout(10.seconds)
>>   import system.dispatcher
>>
>>   val request = HttpRequest(
>> uri = "https://httpstatuses.com/200";
>>   )
>>
>>   val future = Http(system)
>> .singleRequest(request)
>> .map(_.discardEntityBytes())
>>
>>   val systemTerminatedFuture = future.flatMap { _ =>
>> Http().shutdownAllConnectionPools().flatMap { _ =>
>>   materializer.shutdown()
>>   Thread.sleep(3000)
>>   system.terminate()
>> }
>>   }
>>
>>   Await.result(systemTerminatedFuture, Duration.Inf)
>> }
>>
>>
>>
>>
>>
>> Error I get :
>>
>> 2016-11-08 16:17:06,386 [default-akka.actor.default-dispatcher-2] ERROR 
>> akka.actor.ActorSystemImpl - Outgoing request stream error
>> akka.stream.AbruptTerminationException: Processor actor 
>> [Actor[akka://default/user/StreamSupervisor-1/flow-0-0-unknown-operation#-901476425]]
>>  terminated abruptly
>>
>>
>> Now, replace "https" by "http", and there is no error message.
>>
>>
>> Is there anything I am doing wrong?
>>
>>
>> Regards,
>>
>> Sami
>>
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Remoting - no reconnect possible - name or service not known

2017-06-02 Thread Patrik Nordwall
http://doc.akka.io/docs/akka/current/scala/general/remoting.html#peer-to-peer-vs-client-server

Sorry for short answer

/Patrik
fre 2 juni 2017 kl. 09:35 skrev Manfred Bergmann :

> Hello.
>
> I need some advice.
>
> The following situation (Akka 2.5.2):
>
> I have an ActorSystem on one host (host0) and another ActorSystem on
> another host (host1).
>
> host0 is assumed to always be online and available.
> host0 does not know the ip address or hostname of host1 (DNS).
> host0 cannot make connection to host1 due to firewall restrictions but it
> uses the passive mode functionality.
>
> host1 knows the hostname of host0.
> host1 connects to a specific actor on host0 via actorSelection.
>
> host0 watches the connecting actor of host1 and vise versa.
> Initially this works all well.
>
> But after host1 goes down (network cable plugged or whatever) and wants to
> reconnect to host0 via actorSelection it's not possible.
> host1 logs: "Error resolving remote Actor: Actor not found for:
> ActorSelection[Anchor(akka.tcp://system@host0:port/), Path(somepath)]
> host0 logs: "ReliableDeliverySupervisor - Assiciation with remote system
> [akka.tcp://system@host1:port] has failed, address is now gated for
> [5000] ms. Caused by: [No response from remote for outbound association.
> Associate timed out after [15000 ms]]
>
> host1 retries every so often. from the second attempt on the error message
> on host0 is:
> "ReliableDeliverySupervisor - Assiciation with remote system
> [akka.tcp://system@host1:port] has failed, address is now gated for
> [5000] ms. Caused by: [host1: Name or service not known]
>
> host1 can only reconnect, if host0 is re-started.
>
> I'd like to understand what happens here.
> Why is host1 unable to reconnect. Or actually, why does host0 not 'allow'
> the reconnection.
> Now, I've read about the Lifecycle and Failure recovery model. Can it be
> that the link to host1 is quarantined on host0?
> Why the error message "Name or service not known", if previously the
> connection worked?
>
>
> Regards,
> Manfred
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Remoting - no reconnect possible - name or service not known

2017-06-02 Thread Manfred Bergmann
Well, I was hoping you wouldn't respond with that. :/

I read that Akka remoting is a peer2peer framework.
Nevertheless we use it partly as client-server. Probably also because the 
requirements were not absolutely clear in the beginning of software design.

Also that doesn't explain what exactly is happening.
The symptom for sure is that when host1 does a subsequent actorSelection 
against an actor in host1, host1 for some reason tries to make an outbound 
connection to host1, whereas this doesn't seem to happen at the initial 
actorSelection.

I've also found this thread 
here: https://groups.google.com/forum/#!topic/akka-user/GyfUUENeS0o
Which seems to be a similar setup as ours.
Where the solution is that one ActorSystem per remote is created and 
re-started on node termination. Not a perfect solution, by far.

So, yes. Akka IO or HTTP is probably better suited for this particular use 
case.

However, I'm still wondering if there is a chance to get along with Akka 
remoting.


Manfred


Am Freitag, 2. Juni 2017 17:44:25 UTC+2 schrieb Patrik Nordwall:
>
>
> http://doc.akka.io/docs/akka/current/scala/general/remoting.html#peer-to-peer-vs-client-server
>
> Sorry for short answer
>
> /Patrik
> fre 2 juni 2017 kl. 09:35 skrev Manfred Bergmann  >:
>
>> Hello.
>>
>> I need some advice.
>>
>> The following situation (Akka 2.5.2):
>>
>> I have an ActorSystem on one host (host0) and another ActorSystem on 
>> another host (host1).
>>
>> host0 is assumed to always be online and available.
>> host0 does not know the ip address or hostname of host1 (DNS).
>> host0 cannot make connection to host1 due to firewall restrictions but it 
>> uses the passive mode functionality.
>>
>> host1 knows the hostname of host0.
>> host1 connects to a specific actor on host0 via actorSelection.
>>
>> host0 watches the connecting actor of host1 and vise versa.
>> Initially this works all well.
>>
>> But after host1 goes down (network cable plugged or whatever) and wants 
>> to reconnect to host0 via actorSelection it's not possible.
>> host1 logs: "Error resolving remote Actor: Actor not found for: 
>> ActorSelection[Anchor(akka.tcp://system@host0:port/), Path(somepath)]
>> host0 logs: "ReliableDeliverySupervisor - Assiciation with remote system 
>> [akka.tcp://system@host1:port] has failed, address is now gated for [5000] 
>> ms. Caused by: [No response from remote for outbound association. Associate 
>> timed out after [15000 ms]]
>>
>> host1 retries every so often. from the second attempt on the error 
>> message on host0 is:
>> "ReliableDeliverySupervisor - Assiciation with remote system 
>> [akka.tcp://system@host1:port] has failed, address is now gated for [5000] 
>> ms. Caused by: [host1: Name or service not known]
>>
>> host1 can only reconnect, if host0 is re-started.
>>
>> I'd like to understand what happens here.
>> Why is host1 unable to reconnect. Or actually, why does host0 not 'allow' 
>> the reconnection.
>> Now, I've read about the Lifecycle and Failure recovery model. Can it be 
>> that the link to host1 is quarantined on host0?
>> Why the error message "Name or service not known", if previously the 
>> connection worked?
>>
>>
>> Regards,
>> Manfred
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-http client: How to ensure response entity is read

2017-06-02 Thread Daniel Vigovszky
Hi Roland,

I realized that although I jumped into this thread, the thing I'm really 
interested in is not exactly consuming the timed out requests, it can be 
done at one time in the future. The problem is how to cancel an already 
started request after a timeout. 
I put together an example project to show what the issue is:

https://github.com/vigoo/akka-http-client-cancellation

It's of course possible that I'm just missing something, but I could not 
find a way to achieve the behavior I described on the linked project's 
README.

Regards,

vigoo


2017. május 29., hétfő 11:02:44 UTC+2 időpontban rkuhn a következőt írta:
>
> Hi Daniel & Arno,
>
> consuming  response entities needs to be done so that the connection 
> becomes reusable afterwards, i.e. to free up the resource. In this sense, 
> does it really matter how long that takes? If the connection goes idle for 
> too long, it will be closed anyway, so attaching a `Sink.ignore()` should 
> be all that is needed.
>
> Regards,
>
> Roland
>
> 29 maj 2017 kl. 10:50 skrev Daniel Vigovszky  >:
>
> Hi Arno,
>
> did you figure out anything about this?
>
> vigoo
>
> 2017. május 18., csütörtök 7:12:37 UTC+2 időpontban Arno Haase a 
> következőt írta:
>>
>> I am aware that response entities must always be consumed with akka-http 
>> client. 
>>
>> For (very) slow connections I find it difficult to do that reliably. 
>> Consuming the entity is asynchronous and therefore requires timeouts - 
>> but if the timeouts fire, I may be left with an entity that is only 
>> partly consumed. 
>>
>> I am probably missing something here - any help would be greatly 
>> appreciated. 
>>
>> - Arno 
>>
>
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Messages Duplicated

2017-06-02 Thread Joseph Mansigian
I have an application that uses remote addressing exclusively.  The 
application is under development on one local machine; one actor system 
spanning several JVM. When I send a message the message always arrives at 
its intended recipient actor and is processed. Program output verifies that 
the affected actor has received and processed the message.  The program 
runs as expected; no messages are ever lost.

The thing that is mystifying me is that whenever I send a message a 
duplicate  of the message is also sent to the same actor using a local path 
instead of the correct remote path.  As you would expect the message cannot 
be delivered and it goes to the DeadLetter place.   I am monitoring the 
Event Bus and see this on a regular basis.  I have looked very scrupulously 
to see if I could be sending the message twice but cannot find this 
happening.  Also I don't even have any configuration ( application.conf ) 
that is capable of creating a local path to an actor.

Would be interested in two things:

1) Has anybody else seen this behavior.

2) Anyone have ideas about how I can research what is going on.


Software in use: 
akka 2.4.16
scala 2.11.8
java  1.8.0_112
linux  3.13.0-24-generic #47-Ubuntu SMP 

Thanks, Joe


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] ActorPublisher GraphStage alternative?

2017-06-02 Thread Curt Siffert
I made some progress figuring out how to use getStageActor (not 
getStageActorRef - the docs have an error) - the stage’s preStart() registers 
the stage’s actorRef with an outside actor. Messages forwarded to that actorRef 
(via the outside actor) will get sent to a callback function inside the stage.

What I’m doing is using the callback to add items to a stage’s internal queue, 
and then using onPull to dequeue that queue if there are items in the queue.

The part that is confusing to me is that when applied to a Source, I haven’t 
figured out how to get onPull to trigger from that callback. onPull only seems 
to fire right after materialization, when the queue is still empty, and then it 
doesn’t pull again after receiving messages and queuing, even though I have a 
stream wired up to process what what the Source emits. 

ActorPublisher was originally written to suggest that after creating the 
Source, you could then send messages to the actor in a periodic fashion and the 
stream would be able to process those messages. But so far I haven’t been able 
to duplicate that behavior. It’s as if the only way to use getStageActor is if 
its GraphStage is a FlowShape.

In that case, I suppose I could use Source.queue but it seems like I’m missing 
something. Is it not possible to use getStageActor on a Source? If anyone has 
code samples of getStageActor on a Source, I’d love to see it.

Thanks,
Curt


> On May 28, 2017, at 3:19 AM, Patrik Nordwall  
> wrote:
> 
> Can't you use Source.queue? Backpressure can be maintained by piping the 
> result of the future back to an ordinary actor.
> 
> /Patrik
> lör 27 maj 2017 kl. 16:29 skrev Richard Rodseth  >:
> In case it helps:
> 
> https://groups.google.com/d/topic/akka-user/AgVHHnl9ub4/discussion 
> 
> https://github.com/akka/akka/issues/22742 
> 
> 
> On Fri, May 26, 2017 at 1:34 PM, Curt Siffert  > wrote:
> 
> Hi, I see in the docs for 2.5.2 that ActorPublisher/ActorSubscriber will be 
> deprecated.
> 
> In my (still beginning) experiments with akka streams I used ActorPublisher 
> as a way to help create some back pressure controls while consuming messages 
> from an external queue. This worked just by consuming the queue like normal 
> and then for each message consumed, sending a message to ActorPublisher.
> 
> Without using ActorPublisher, I can use a Source.actorRef, but that doesn't 
> have back pressure controls.
> 
> I know the recommended alternative to ActorPublisher is to use a custom graph 
> stage and I have started experimenting with that but so far I don't see how 
> to meet the ActorPublisher use case with it. So far it doesn't seem like a 
> custom Source has an ActorRef type signature like Source.actorRef does. Once 
> the custom stage is created, can I send a "tell" message to it the way I did 
> to ActorPublisher? Or am I supposed to use Source.actorRef and then funnel it 
> through the custom stage to get the back pressure controls?
> 
> Sorry if my question is muddled, I am still making my way through this. :-) I 
> recognize this is a bit weird since ideally the back pressure controls would 
> be applied to the queueing tech itself.
> 
> Curt
> 
> 
> -- 
> >> Read the docs: http://akka.io/docs/ 
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >> 
> >> Search the archives: https://groups.google.com/group/akka-user 
> >> 
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> .
> To post to this group, send email to akka-user@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user 
> .
> For more options, visit https://groups.google.com/d/optout 
> .
> 
> 
> -- 
> >> Read the docs: http://akka.io/docs/ 
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >> 
> >> Search the archives: https://groups.google.com/group/akka-user 
> >> 
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> 

[akka-user] Re: TCP Handshaking custom stage drives me crazy

2017-06-02 Thread Henrik Larsson
Thanks for the help, yes that did the trick to add one more pull for sslIn. 
The reason bytesIn is empty is because I really dont need a bidishape. I 
need this logic encapsulated in something with one input and two outputs 
because after this handshaking is done there is no need to send any more 
messages to the TCP socket. However since I found your example the only 
example that we close to what i needed and you use bidishape i stick with 
this solution until i understad more about Akka Stream.

On Friday, June 2, 2017 at 1:40:42 PM UTC+2, Michał Sitko wrote:
>
> Hey Larsson - based on debug messages when running live I would guess that 
> you miss `pull(sslIn)` somewhere. I have a hypothesis for what's going on. 
> Let's take a look at:
>
> ```
> setHandler(bytesOut, new OutHandler {
>   override def onPull() = state match {
> case Connecting =>
> case Authenticating =>
> case Subscribing =>
> case Subscribed => pull(sslIn)
>   }
> })
> ```
>
> You're pulling just when in `Subscribed` state. That means that if 
> `bytesOut.onPull` was called before Stage went into `Subscribed` state then 
> `pull(sslIn)` will not get called. Therefore we need to ensure that pull 
> will be called even in that case. You can do this by e.g. adding:
>
> ```scala
> if (subscriptionValid(event)) {
>   state = Subscribed
>   logger.info(state.toString)
>   if (isAvailable(sslIn)) {
> pull(sslIn)
>   }
> }
> ```
>
> in `onPush` handler for `sslIn`.
>
> To debug it I would add `println`s in all places we do `pull(sslIn)` just 
> to be sure they're really are executed. In extreme case you can add println 
> before all calls to `pull` and `push` - I know there will be a lot of 
> output but will give you insight into what's going on.
>
> BTW, I don't understand this one:
>
> ```scala
> setHandler(bytesIn, new InHandler {
>   override def onPush() = {
>   }
> }
> ```
>
> Also, find it strange that your test works. You are not sending anything 
> with `toBetfairProbe` (which simulates input port from TCP as far as I 
> understand). I have not idea, maybe you pasted wrong code? 
>
> Hope it will help, let us know in case of further troubles.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.