[akka-user] Re: Node quarantined

2016-03-22 Thread Guido Medina
Yeah sorry I thought it was related with rolling restart.

As for Netty, I'm using a *non-published yet* Netty with the following 
fixes:
https://github.com/netty/netty/issues?q=milestone%3A3.10.6.Final+is%3Aclosed

You can just get it from Git and:

$ git checkout 3.10
$ mvn versions:set -DnewVersion=3.10.6.Final -DgenerateBackupPoms=false
$ mvn clean install

And see if your problem goes away,

Guido.

On Tuesday, March 22, 2016 at 10:27:26 PM UTC, Benjamin Black wrote:
>
> Hi Guido, yes I'm aware of the leaving cluster conversation as I started 
> it :-) This is separate issue. I am observing this behavior whilst the 
> cluster seems stable with no nodes being added/removed. I suspect that this 
> issue was first observed when I upgraded a different library that brought 
> in a new version of the netty library.
>
> On Tuesday, March 22, 2016 at 6:23:14 PM UTC-4, Guido Medina wrote:
>>
>> Hi Benjamin,
>>
>> You have nodes with predefined ports, one thing I have which eliminates 
>> that problem for these nodes is that
>> only my seed node(s) have the port set, the rest will just get a dynamic 
>> and available port, making it get a different port when you
>> do a rolling restart.
>>
>> I suspect you are doing a rolling restart right? so you need to wait for 
>> that node with that address to completely leave the cluster (I'm also doing 
>> that),
>> basically you terminate your system when you receive the message 
>> *MemberRemoved* for *_self_* address.
>>
>> I think I saw a discussion related to quarantine nodes when they are 
>> re-joining using the same address, not sure if here or if it is an actual 
>> Git ticket.
>>
>> HTH,
>>
>> Guido.
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Node quarantined

2016-03-22 Thread Benjamin Black
Hi Guido, yes I'm aware of the leaving cluster conversation as I started it 
:-) This is separate issue. I am observing this behavior whilst the cluster 
seems stable with no nodes being added/removed. I suspect that this issue 
was first observed when I upgraded a different library that brought in a 
new version of the netty library.

On Tuesday, March 22, 2016 at 6:23:14 PM UTC-4, Guido Medina wrote:
>
> Hi Benjamin,
>
> You have nodes with predefined ports, one thing I have which eliminates 
> that problem for these nodes is that
> only my seed node(s) have the port set, the rest will just get a dynamic 
> and available port, making it get a different port when you
> do a rolling restart.
>
> I suspect you are doing a rolling restart right? so you need to wait for 
> that node with that address to completely leave the cluster (I'm also doing 
> that),
> basically you terminate your system when you receive the message 
> *MemberRemoved* for *_self_* address.
>
> I think I saw a discussion related to quarantine nodes when they are 
> re-joining using the same address, not sure if here or if it is an actual 
> Git ticket.
>
> HTH,
>
> Guido.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Node quarantined

2016-03-22 Thread Guido Medina
Hi Benjamin,

You have nodes with predefined ports, one thing I have which eliminates 
that problem for these nodes is that
only my seed node(s) have the port set, the rest will just get a dynamic 
and available port, making it get a different port when you
do a rolling restart.

I suspect you are doing a rolling restart right? so you need to wait for 
that node with that address to completely leave the cluster (I'm also doing 
that),
basically you terminate your system when you receive the message 
*MemberRemoved* for *_self_* address.

I think I saw a discussion related to quarantine nodes when they are 
re-joining using the same address, not sure if here or if it is an actual 
Git ticket.

HTH,

Guido.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Node quarantined

2016-03-22 Thread Guido Medina
Hi Benjamin,

You have nodes with predefined ports and addresses, one thing I have which 
eliminates that problem for these nodes is that
only my seed node(s) have the port set, the rest will just get a dynamic 
and available port, making it get a different port when you
do a rolling restart.

I suspect you are doing a rolling restart right? so you need to wait for 
that node with that address to completely leave the cluster (I'm also doing 
that),
basically you terminate your system when you receive the message 
*MemberRemoved* for *_self_* address.

I think I saw a discussion related to quarantine nodes when they are 
re-joining using the same address, not sure if here or if it is an actual 
Git ticket.

HTH,

Guido.

On Tuesday, March 22, 2016 at 9:22:00 PM UTC, Benjamin Black wrote:
>
> I see the same issue with 2.3.14.
>
> On Tuesday, March 22, 2016 at 2:00:15 PM UTC-4, Guido Medina wrote:
>>
>> To eliminate noise please update to 2.3.14 which from 2.3.11 has some 
>> cluster fixes, there are also several fixes on Scala 2.11.8 (not related)
>>
>> I don't know, I just have the custom of keeping my libs up to date.
>>
>> HTH,
>>
>> Guido.
>>
>> On Tuesday, March 22, 2016 at 5:34:23 PM UTC, Benjamin Black wrote:
>>>
>>> Hello,
>>>
>>> I'm trying to understand the cause of nodes being quarantined and 
>>> possible solutions to fixing it. I'm using akka 2.3.11. On the quarantined 
>>> node I see this logging:
>>>
>>> 2:45:44.204 ERROR [geyser-akka.remote.default-remote-dispatcher-6] 
>>> a.r.EndpointWriter - AssociationError [akka.tcp://
>>> geyser@172.16.120.174:7000] <- [akka.tcp://geyser@172.17.100.105:7000]: 
>>> Error [Invalid address: akka.tcp://geyser@172.17.100.105:7000] [
>>> akka.remote.InvalidAssociation: Invalid address: akka.tcp://
>>> geyser@172.17.100.105:7000
>>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: 
>>> The remote system has quarantined this system. No further associations to 
>>> the remote system are possible until this system is restarted.
>>> ]
>>> 12:45:44.205 WARN  [geyser-akka.remote.default-remote-dispatcher-25] 
>>> Remoting - Tried to associate with unreachable remote address [akka.tcp://
>>> geyser@172.17.100.105:7000]. Address is now gated for 5000 ms, all 
>>> messages to this address will be delivered to dead letters. Reason: [The 
>>> remote system has quarantined this system. No further associations to the 
>>> remote system are possible until this system is restarted.]
>>>
>>> And on the node that cause the box to be quarantined I see this logging:
>>>
>>> 12:45:44.194 WARN  [geyser-akka.remote.default-remote-dispatcher-6] 
>>> Remoting - Association to [akka.tcp://geyser@172.16.120.174:7000] 
>>> having UID [-450748474] is irrecoverably failed. UID is now quarantined and 
>>> all messages to this UID will be delivered to dead letters. Remote 
>>> actorsystem must be restarted to recover from this situation.
>>> 12:45:44.202 WARN  [geyser-akka.remote.default-remote-dispatcher-7] 
>>> a.r.EndpointWriter - AssociationError [akka.tcp://
>>> geyser@172.17.100.105:7000] -> [akka.tcp://geyser@172.16.120.174:7000]: 
>>> Error [Invalid address: akka.tcp://geyser@172.16.120.174:7000] [
>>> akka.remote.InvalidAssociation: Invalid address: akka.tcp://
>>> geyser@172.16.120.174:7000
>>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: 
>>> The remote system has a UID that has been quarantined. Association aborted.
>>> ]
>>> 12:45:44.203 WARN  [geyser-akka.remote.default-remote-dispatcher-7] 
>>> Remoting - Tried to associate with unreachable remote address [akka.tcp://
>>> geyser@172.16.120.174:7000]. Address is now gated for 5000 ms, all 
>>> messages to this address will be delivered to dead letters. Reason: [The 
>>> remote system has a UID that has been quarantined. Association aborted.]
>>> 12:45:44.221 ERROR [geyser-akka.remote.default-remote-dispatcher-7] 
>>> Remoting - Association to [akka.tcp://geyser@172.16.120.174:7000] with 
>>> UID [-450748474] irrecoverably failed. Quarantining address.
>>> java.lang.IllegalStateException: Error encountered while processing 
>>> system message acknowledgement buffer: [-1 {}] ack: ACK[6, {}]
>>> at 
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:288)
>>>  
>>> ~[geyser.jar:1.1.17-SNAPSHOT]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
>>> ~[geyser.jar:1.1.17-SNAPSHOT]
>>> Caused by: java.lang.IllegalArgumentException: Highest SEQ so far was -1 
>>> but cumulative ACK is 6
>>> at 
>>> akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103) 
>>> ~[geyser.jar:1.1.17-SNAPSHOT]
>>> at 
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:284)
>>>  
>>> ~[geyser.jar:1.1.17-SNAPSHOT]
>>> ... 11 common frames omitted
>>> 12:45:44.221 WARN  [geyser-akka.remote.default-remote-dispatcher-7] 
>>> Remoting - Association to 

[akka-user] Re: Node quarantined

2016-03-22 Thread Benjamin Black
I see the same issue with 2.3.14.

On Tuesday, March 22, 2016 at 2:00:15 PM UTC-4, Guido Medina wrote:
>
> To eliminate noise please update to 2.3.14 which from 2.3.11 has some 
> cluster fixes, there are also several fixes on Scala 2.11.8 (not related)
>
> I don't know, I just have the custom of keeping my libs up to date.
>
> HTH,
>
> Guido.
>
> On Tuesday, March 22, 2016 at 5:34:23 PM UTC, Benjamin Black wrote:
>>
>> Hello,
>>
>> I'm trying to understand the cause of nodes being quarantined and 
>> possible solutions to fixing it. I'm using akka 2.3.11. On the quarantined 
>> node I see this logging:
>>
>> 2:45:44.204 ERROR [geyser-akka.remote.default-remote-dispatcher-6] 
>> a.r.EndpointWriter - AssociationError [akka.tcp://
>> geyser@172.16.120.174:7000] <- [akka.tcp://geyser@172.17.100.105:7000]: 
>> Error [Invalid address: akka.tcp://geyser@172.17.100.105:7000] [
>> akka.remote.InvalidAssociation: Invalid address: akka.tcp://
>> geyser@172.17.100.105:7000
>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: 
>> The remote system has quarantined this system. No further associations to 
>> the remote system are possible until this system is restarted.
>> ]
>> 12:45:44.205 WARN  [geyser-akka.remote.default-remote-dispatcher-25] 
>> Remoting - Tried to associate with unreachable remote address [akka.tcp://
>> geyser@172.17.100.105:7000]. Address is now gated for 5000 ms, all 
>> messages to this address will be delivered to dead letters. Reason: [The 
>> remote system has quarantined this system. No further associations to the 
>> remote system are possible until this system is restarted.]
>>
>> And on the node that cause the box to be quarantined I see this logging:
>>
>> 12:45:44.194 WARN  [geyser-akka.remote.default-remote-dispatcher-6] 
>> Remoting - Association to [akka.tcp://geyser@172.16.120.174:7000] having 
>> UID [-450748474] is irrecoverably failed. UID is now quarantined and all 
>> messages to this UID will be delivered to dead letters. Remote actorsystem 
>> must be restarted to recover from this situation.
>> 12:45:44.202 WARN  [geyser-akka.remote.default-remote-dispatcher-7] 
>> a.r.EndpointWriter - AssociationError [akka.tcp://
>> geyser@172.17.100.105:7000] -> [akka.tcp://geyser@172.16.120.174:7000]: 
>> Error [Invalid address: akka.tcp://geyser@172.16.120.174:7000] [
>> akka.remote.InvalidAssociation: Invalid address: akka.tcp://
>> geyser@172.16.120.174:7000
>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: 
>> The remote system has a UID that has been quarantined. Association aborted.
>> ]
>> 12:45:44.203 WARN  [geyser-akka.remote.default-remote-dispatcher-7] 
>> Remoting - Tried to associate with unreachable remote address [akka.tcp://
>> geyser@172.16.120.174:7000]. Address is now gated for 5000 ms, all 
>> messages to this address will be delivered to dead letters. Reason: [The 
>> remote system has a UID that has been quarantined. Association aborted.]
>> 12:45:44.221 ERROR [geyser-akka.remote.default-remote-dispatcher-7] 
>> Remoting - Association to [akka.tcp://geyser@172.16.120.174:7000] with 
>> UID [-450748474] irrecoverably failed. Quarantining address.
>> java.lang.IllegalStateException: Error encountered while processing 
>> system message acknowledgement buffer: [-1 {}] ack: ACK[6, {}]
>> at 
>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:288)
>>  
>> ~[geyser.jar:1.1.17-SNAPSHOT]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
>> ~[geyser.jar:1.1.17-SNAPSHOT]
>> Caused by: java.lang.IllegalArgumentException: Highest SEQ so far was -1 
>> but cumulative ACK is 6
>> at 
>> akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103) 
>> ~[geyser.jar:1.1.17-SNAPSHOT]
>> at 
>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:284)
>>  
>> ~[geyser.jar:1.1.17-SNAPSHOT]
>> ... 11 common frames omitted
>> 12:45:44.221 WARN  [geyser-akka.remote.default-remote-dispatcher-7] 
>> Remoting - Association to [akka.tcp://geyser@172.16.120.174:7000] having 
>> UID [-450748474] is irrecoverably failed. UID is now quarantined and all 
>> messages to this UID will be delivered to dead letters. Remote actorsystem 
>> must be restarted to recover from this situation.
>>
>> Quite a bit of data can be passed between the nodes ~200 Mb/sec and maybe 
>> the system is hitting a capacity issue although I don't see any issue with 
>> CPU or memory. I noticed that the default-remote-dispatcher only has two 
>> threads. Are these threads being used to send the data? And if so should I 
>> try increase the thread count? Are there any other settings I could play 
>> with of things I can look for in the logs that might highlight what is 
>> wrong?
>>
>> Thanks,
>> Ben
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> 

Re: [akka-user] Re: Can this be done with the build in stages?

2016-03-22 Thread Endre Varga
I like the throttle version more ;)

On Sun, Mar 20, 2016 at 4:24 PM, Roland Kuhn  wrote:

> Yes, and for the immediate case you can use Future.successful.
>
> Regards, Roland
>
> Sent from my iPhone
>
> On 20 Mar 2016, at 15:41, john.vie...@gmail.com wrote:
>
> Something very simple which come to my mind is  to mapAsync to the ask
> pattern and then do a "schedule of 10 secs" if the element is an empty list.
>
>
>
> Am Sonntag, 20. März 2016 14:31:51 UTC+1 schrieb john@gmail.com:
>>
>> val input  = Source(List(List(1, 2, 3), List(1, 2, 3), List(), 
>> List(),List(),List(),List(1, 2, 3))
>>
>>
>> 1) Process input. for example input.via(throttleFlow).mapConcat(t => t)
>>
>> where throttleFlow show do the *following:*
>>
>>
>> 2) If element of source has size > 0 pass it immediately  downstream.
>>
>> 3) Or if element of source has size == 0 wait 10 secs
>>
>>
>> Can this be done with the build in stages?
>>
>>
>>   --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Can't get akka clustering to work in docker

2016-03-22 Thread Eric Swenson
Hi Endre,

I have read that part of the documentation, and after switching to a new 
cassandra journal keyspace, everything is working as it should be. I've 
confirmed that if I bump the "ECS Service" count to 2 (or greater), the ECS 
Task (describing an akka-cluster-sharding application) gets deployed on 
another ECS Container instance. With the custom seed enumeration code I 
wrote (which queries the ECS cluster/service for the IP addresses of all 
the ECS container instances) and the custom code to set 
akka.remote.netty.tcp.hostname to the IP address of the container instance 
(rather than the docker instance's docker0 ip address), the cluster members 
all see each other and all nodes join the cluster.

For the (written) record, since others will undoubtedly run into the same 
issue as I did, here are the important things you have to do to get an 
akka-cluster application working on AWS/ECS.

1) You have to dynamically configure the seed nodes to the IP addresses of 
the ECS container instances in your cluster. The way I did this was as 
follows:  I wrote a scala library using the AWS Java SDK that, given the 
ECS Cluster Arn and Service Name, enumerated the tasks for the service, and 
for those tasks, enumerates the container instances on which the tasks are 
running. Given those container instances the code determines the EC2 
instance ID of the EC2 instances hosting the container instances. And using 
the EC2 DescribeInstances API, it determines the IP address (private, in my 
case) of the EC2 instances.  Finally, the IP addresses are mapped to the 
akka.tcp URLs required to configure the seed nodes.

2) You have to dynamically configure the akka.remote.netty.tcp.hostname to 
be the IP address of the ECS container instance on which your docker 
container is running. With no customizations, akka will set this to the IP 
address on the docker0 interface, which is a private IP address not 
accessible from other akka cluster members. Since there doesn't appear to 
be a way, on AWS ECS for a docker container to determine the IP address of 
the docker host (ECS Container Instance), I "cheated".  I used the metadata 
URL that all EC2 instances support, to query the ip address 
(http://169.254.169.254/latest/meta-data/local-ipv4;.

-- Eric

On Tuesday, March 22, 2016 at 3:06:59 AM UTC-7, Akka Team wrote:
>
> Hi Eric,
>
> I have no experience with Docker at all, but it does feel wrong (unless 
> very specific use-cases) to have separate journals and snapshot stores 
> *per-node*. I think you might have an issue with Docker NAT. Have you read 
> this part of the documentation: 
> http://doc.akka.io/docs/akka/2.4.2/scala/remoting.html#Akka_behind_NAT_or_in_a_Docker_container
>
> -Endre
>
> On Sat, Mar 19, 2016 at 3:27 AM, Eric Swenson  > wrote:
>
>> Well, I may be able to answer my own question. It absolutely does matter 
>> that the new remote cluster system is using the same akka-persistence 
>> (cassandra keyspace) store as the old (local) one.  When I changed the 
>> cassandra keyspace to a new one, everything started working.
>>
>> So the question now is this: Do I have to have a separate 
>> akka-persistence journal and snapshot store for every node in the cluster?  
>> This is very inconvenient, as it means I have to make up keyspace names 
>> that are somehow tied to each individual node.  I guess I can add the ip 
>> address of the Docker host to the keyspace name, but this isn’t terribly 
>> resilient.  Why does akka-persistence care? The journal should reflect 
>> events that apply to all nodes. If a node goes down (getting a new address) 
>> and a new one takes its place, it should be able to recover all the events 
>> from the old node.  There must be something else at play here.
>>
>> Help!  
>>
>> — Eric
>>
>> On Mar 18, 2016, at 7:17 PM, Eric Swenson > > wrote:
>>
>> One more thing to add to this, in case it is relevant.  I see multiple of 
>> these messages in the log:
>>
>> [
>> akka://ClusterSystem/system/sharding/ExperimentInstanceCoordinator/singleton/coordinator]
>>  
>> stopped
>>
>>
>> First, why is it stopping (or why does it stop, in general), and 
>> secondly, is it significant that the url prefix is akka://ClusterSystem/ 
>> rather than akka.tcp://ClusterSystem@10.0.3.170:2552/
>>
>> And second, I assume it makes no difference that I’m using the same 
>> akka-persistence journal/snapshot store as I used when the app was binding 
>> to 127.0.0.1:2552.  I get tons of log messages indicating that 
>> receiveRecover is not happy trying to recover shards associated with 
>> akka.tcp://ClusterSystem@127.0.0.1:2552/system/sharing/ExperimentInstance#-396422686.
>>   
>> I’m assuming this is expected and that akka persistence should be able to 
>> deal with this case. It should fail to recover these and then carry on with 
>> new persistence events that are targeted to the new ClusterSystem on the 
>> new IP address.
>>
>> Examples of the rejected 

Re: [akka-user] Re: Can this be done with the build in stages?

2016-03-22 Thread Roland Kuhn
Yes, and for the immediate case you can use Future.successful.

Regards, Roland 

Sent from my iPhone

> On 20 Mar 2016, at 15:41, john.vie...@gmail.com wrote:
> 
> Something very simple which come to my mind is  to mapAsync to the ask 
> pattern and then do a "schedule of 10 secs" if the element is an empty list.
> 
> 
> 
> Am Sonntag, 20. März 2016 14:31:51 UTC+1 schrieb john@gmail.com:
>> 
>> val input  = Source(List(List(1, 2, 3), List(1, 2, 3), List(), 
>> List(),List(),List(),List(1, 2, 3))
>> 
>> 1) Process input. for example input.via(throttleFlow).mapConcat(t => t)
>> where throttleFlow show do the following:
>> 
>> 2) If element of source has size > 0 pass it immediately  downstream.
>> 3) Or if element of source has size == 0 wait 10 secs
>> 
>> Can this be done with the build in stages?
> 
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] What is the impact of Kafka Streams on reactive-kafka?

2016-03-22 Thread Akka Team
It looks like Kafka Streams is more of a distributed framework, does
balancing, the app can be moved, etc.

RS based streams on the other hand are more lightweight and local. I.e.
after reading this article this looks like an apples vs. oranges comparison
for now.

-Endre

On Tue, Mar 22, 2016 at 5:38 PM, 'Axel Poigné' via Akka User List <
akka-user@googlegroups.com> wrote:

> For reference, Kafka Streams is this thing:
> http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple
>
> Reactive Kafka is (or will be) a Kafka connector to/from Akka Streams (and
> thereby also Reactive Streams).
>
>
>  Hi Patrik,
>
> i am aware of both. My understanding of Kafka Streams, however, is
> miniscule. I hoped for some enlightenment by someone more knowledgable if
> compared to Reactive Kafka. Seems that Kafka Streams have some interesting
> ideas concerning the dichotomy of event streams and tables.
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] What is the impact of Kafka Streams on reactive-kafka?

2016-03-22 Thread 'Axel Poigné' via Akka User List
> For reference, Kafka Streams is this thing: 
> http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple
>  
> 
> 
> Reactive Kafka is (or will be) a Kafka connector to/from Akka Streams (and 
> thereby also Reactive Streams).

 Hi Patrik, 

i am aware of both. My understanding of Kafka Streams, however, is miniscule. I 
hoped for some enlightenment by someone more knowledgable if compared to 
Reactive Kafka. Seems that Kafka Streams have some interesting ideas concerning 
the dichotomy of event streams and tables.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Rejections in Akka HTTP Java API

2016-03-22 Thread Konrad Malawski
There's a completely new javadsl coming in the next weeks, it will have
rejections (and docs for them) :)

Thanks for your patience!
On Mar 22, 2016 15:37, "Adam"  wrote:

> Hi,
>
> I see Rejections are only described in the Scala version of the docs.
> Is that on purpose?
> What are my options using the Java API in order to customize rejections?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Rejections in Akka HTTP Java API

2016-03-22 Thread אדם חונן
Cool.
Thanks for quick response!

On Tue, Mar 22, 2016 at 4:41 PM, Konrad Malawski <
konrad.malaw...@lightbend.com> wrote:

> There's a completely new javadsl coming in the next weeks, it will have
> rejections (and docs for them) :)
>
> Thanks for your patience!
> On Mar 22, 2016 15:37, "Adam"  wrote:
>
>> Hi,
>>
>> I see Rejections are only described in the Scala version of the docs.
>> Is that on purpose?
>> What are my options using the Java API in order to customize rejections?
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/akka-user/ghYrFlHTu50/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Rejections in Akka HTTP Java API

2016-03-22 Thread Adam
Hi,

I see Rejections are only described in the Scala version of the docs.
Is that on purpose?
What are my options using the Java API in order to customize rejections?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] stash with big queues

2016-03-22 Thread Tim Pigden
I've got an FSM with Stash, something like this

when (Idle)
  case Event(cmd: ...)
process() // asynchronous activity
goto Processing


when(Processing)
  case Event(updated:...)
// do something with result
unstashAll()
goto (Idle)
  case Event(cmd: ..)
stash()
stay


There are periods where my process() cannot keep up with the incoming 
queue. This is not an intrinsic problem. But a problem appears to exist 
that if the queue gets sufficiently big, I spend more time doing unstashAll 
and restashing all the messages than I do processing. If my queue consists 
of n elements then I end up with O(n^2) stash and process events.

I guess this means stash should not be used in cases other than queue is 
unlikely to get big. Is there a standard alternative or should I just go 
and write my own BigQueueStash ?

I should add I don't think it ever gets so big that I should worry about 
back pressure - it's just queue management









 


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Can this be done with the build in stages?

2016-03-22 Thread Akka Team
Hi John,

I think it is easier to use the built-in throttle:
 - set the capacity to zero
 - set the token rate to 1/second
 - use an explicit cost function, setting the cost of non-empty sequences
to zero, and the cost of empty-ones to ten.

-Endre

On Sun, Mar 20, 2016 at 6:06 PM,  wrote:

> I came up with this code:
>
> http://pastebin.com/LNTCvebe
>
> But beware  until now I have only be using akka with java. It is my first
> try on using scala!
>
> As a side note : I could not figure out how to match an non-empty list in
> receive?
>
> Many Greetings
> John
>
>
>
> Am Sonntag, 20. März 2016 14:31:51 UTC+1 schrieb john@gmail.com:
>
>> val input  = Source(List(List(1, 2, 3), List(1, 2, 3), List(), 
>> List(),List(),List(),List(1, 2, 3))
>>
>>
>> 1) Process input. for example input.via(throttleFlow).mapConcat(t => t)
>>
>> where throttleFlow show do the *following:*
>>
>>
>> 2) If element of source has size > 0 pass it immediately  downstream.
>>
>> 3) Or if element of source has size == 0 wait 10 secs
>>
>>
>> Can this be done with the build in stages?
>>
>>
>>   --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Can't get akka clustering to work in docker

2016-03-22 Thread Akka Team
Hi Eric,

I have no experience with Docker at all, but it does feel wrong (unless
very specific use-cases) to have separate journals and snapshot stores
*per-node*. I think you might have an issue with Docker NAT. Have you read
this part of the documentation:
http://doc.akka.io/docs/akka/2.4.2/scala/remoting.html#Akka_behind_NAT_or_in_a_Docker_container

-Endre

On Sat, Mar 19, 2016 at 3:27 AM, Eric Swenson  wrote:

> Well, I may be able to answer my own question. It absolutely does matter
> that the new remote cluster system is using the same akka-persistence
> (cassandra keyspace) store as the old (local) one.  When I changed the
> cassandra keyspace to a new one, everything started working.
>
> So the question now is this: Do I have to have a separate akka-persistence
> journal and snapshot store for every node in the cluster?  This is very
> inconvenient, as it means I have to make up keyspace names that are somehow
> tied to each individual node.  I guess I can add the ip address of the
> Docker host to the keyspace name, but this isn’t terribly resilient.  Why
> does akka-persistence care? The journal should reflect events that apply to
> all nodes. If a node goes down (getting a new address) and a new one takes
> its place, it should be able to recover all the events from the old node.
> There must be something else at play here.
>
> Help!
>
> — Eric
>
> On Mar 18, 2016, at 7:17 PM, Eric Swenson  wrote:
>
> One more thing to add to this, in case it is relevant.  I see multiple of
> these messages in the log:
>
> [
> akka://ClusterSystem/system/sharding/ExperimentInstanceCoordinator/singleton/coordinator]
> stopped
>
>
> First, why is it stopping (or why does it stop, in general), and secondly,
> is it significant that the url prefix is akka://ClusterSystem/ rather
> than akka.tcp://ClusterSystem@10.0.3.170:2552/
>
> And second, I assume it makes no difference that I’m using the same
> akka-persistence journal/snapshot store as I used when the app was binding
> to 127.0.0.1:2552.  I get tons of log messages indicating that
> receiveRecover is not happy trying to recover shards associated with
> akka.tcp://ClusterSystem@127.0.0.1:2552/system/sharing/ExperimentInstance#-396422686.
> I’m assuming this is expected and that akka persistence should be able to
> deal with this case. It should fail to recover these and then carry on with
> new persistence events that are targeted to the new ClusterSystem on the
> new IP address.
>
> Examples of the rejected receiveRecover messages that I’m seeing are:
>
> [akka.tcp://ClusterSystem@10.0.3.170:2552/system/cassandra-journal]
> Starting message scan from 1
> [DEBUG] [03/19/2016 02:09:02.712]
> [ClusterSystem-akka.actor.default-dispatcher-18] [
> akka.tcp://ClusterSystem@10.0.3.170:2552/system/sharding/ExperimentInstanceCoordinator/singleton/coordinator]
> receiveRecover ShardRegionRegistered(Actor[
> akka.tcp://ClusterSystem@127.0.0.1:2552/system/sharding/ExperimentInstance#-396422686
> ])
>
> — Eric
>
> On Mar 18, 2016, at 6:54 PM, Eric Swenson  wrote:
>
> I’ve been unsuccessful in trying to get an akka-cluster application that
> works fine with one instance to work when there are multiple members of the
> clusters.  A bit of background is in order:
>
> 1) the application is an akka-cluster-sharing application
> 2) it runs in a docker container
> 3) the cluster is comprised of multiple docker hosts, each running the
> akka application
> 4) the error I’m getting is this:
>
> [WARN] [03/19/2016 01:39:18.086]
> [ClusterSystem-akka.actor.default-dispatcher-3] [
> akka.tcp://ClusterSystem@10.0.3.170:2552/system/sharding/ExperimentInstance]
> Trying to register to coordinator at [Some(ActorSelection[Anchor(
> akka://ClusterSystem/),
> Path(/system/sharding/ExperimentInstanceCoordinator/singleton/coordinator)])],
> but no acknowledgement. Total [1] buffered messages.
> [WARN] [03/19/2016 01:39:20.086]
> [ClusterSystem-akka.actor.default-dispatcher-3] [
> akka.tcp://ClusterSystem@10.0.3.170:2552/system/sharding/ExperimentInstance]
> Trying to register to coordinator at [Some(ActorSelection[Anchor(
> akka://ClusterSystem/),
> Path(/system/sharding/ExperimentInstanceCoordinator/singleton/coordinator)])],
> but no acknowledgement. Total [1] buffered messages.
>
> 5) the warning message is logged repeatedly and the cluster never
> initializes.
> 6) I’ve set the following config parameters:
> akka.remote.netty.tcp.hostname: to the actual host ip address (the one
> accessible from all the other docker hosts)
> akka.remote.netty.tcp.bind-hostname: to 0.0.0.0 (so that it binds on the
> docker0 interface, on the ip address of the container)
> akka.remote.netty.tcp.port: 2552
> akka.remote.netty.tcp.bind-port:2552
> 7) when I start the container, I map port 2552 on the host to port 2552 on
> the container.
> 8) from the host, I’m able to do a “telnet  2552” so I
> should be taking to the akka-remoting handler.
> 9) I’m 

Re: [akka-user] What is the impact of Kafka Streams on reactive-kafka?

2016-03-22 Thread Patrik Nordwall
For reference, Kafka Streams is this thing:
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple

Reactive Kafka is (or will be) a Kafka connector to/from Akka Streams (and
thereby also Reactive Streams).

It looks like Kafka Streams provides another streaming DSL, but I don't
know anything about it.

On Tue, Mar 22, 2016 at 11:00 AM, Akka Team  wrote:

> Hi Axel,
>
> I am not sure what the question is about here. Anyway, reactive-kafka is
> not just about reading from Kafka, it also provides a Reactive Streams
> based interface that allows interoperability with anything that implements
> RS.
>
> -Endre
>
> On Sat, Mar 19, 2016 at 1:17 PM, 'Axel Poigné' via Akka User List <
> akka-user@googlegroups.com> wrote:
>
>> Seems to me that Kafka streams support distribution but not back
>> pressure, reactive-kafka vice versa.
>>
>> Anyway, using Kafka streams seems to be very lightweight when using Kafka.
>>
>> Any comments?
>>
>> --
>> >>  Read the docs: http://akka.io/docs/
>> >>  Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>  Search the archives:
>> https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Akka Team
> Typesafe - Reactive apps on the JVM
> Blog: letitcrash.com
> Twitter: @akkateam
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

[image: Lightbend] 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Detecting empty source

2016-03-22 Thread Viktor Klang
…and, an empty source is one which signals onComplete before any data has
passed through. :)

On Tue, Mar 22, 2016 at 10:58 AM, Akka Team  wrote:

> Hi Richard,
>
> There is no other way to detect whether a Source is empty than to run it.
> For example if a Source wraps a DB query then of course it is impossible to
> see if the query will be empty or not without actually materializing and
> hence running the query.
>
> -Endre
>
> On Mon, Mar 21, 2016 at 4:59 AM, Richard Rodseth 
> wrote:
>
>> I'm doing a flatMapMerge something like this:
>>
>> val stream = Source(channelMonths)
>>
>>   .flatMapMerge(10, channelMonth => {
>>
>> ..Sources.intervalsForChannelMonth(channelMonth, ...)
>>
>>   })
>>
>> I'm implementing some monitoring using alsoTo to send stream elements to
>> a monitoring actor which can keep counts and so forth.
>>
>> How could I detect that the intervals source is sometimes empty? eg.
>> record the channels for which there are 0 intervals?
>>
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Akka Team
> Typesafe - Reactive apps on the JVM
> Blog: letitcrash.com
> Twitter: @akkateam
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] What is the impact of Kafka Streams on reactive-kafka?

2016-03-22 Thread Akka Team
Hi Axel,

I am not sure what the question is about here. Anyway, reactive-kafka is
not just about reading from Kafka, it also provides a Reactive Streams
based interface that allows interoperability with anything that implements
RS.

-Endre

On Sat, Mar 19, 2016 at 1:17 PM, 'Axel Poigné' via Akka User List <
akka-user@googlegroups.com> wrote:

> Seems to me that Kafka streams support distribution but not back pressure,
> reactive-kafka vice versa.
>
> Anyway, using Kafka streams seems to be very lightweight when using Kafka.
>
> Any comments?
>
> --
> >>  Read the docs: http://akka.io/docs/
> >>  Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>  Search the archives:
> https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Detecting empty source

2016-03-22 Thread Akka Team
Hi Richard,

There is no other way to detect whether a Source is empty than to run it.
For example if a Source wraps a DB query then of course it is impossible to
see if the query will be empty or not without actually materializing and
hence running the query.

-Endre

On Mon, Mar 21, 2016 at 4:59 AM, Richard Rodseth  wrote:

> I'm doing a flatMapMerge something like this:
>
> val stream = Source(channelMonths)
>
>   .flatMapMerge(10, channelMonth => {
>
> ..Sources.intervalsForChannelMonth(channelMonth, ...)
>
>   })
>
> I'm implementing some monitoring using alsoTo to send stream elements to a
> monitoring actor which can keep counts and so forth.
>
> How could I detect that the intervals source is sometimes empty? eg.
> record the channels for which there are 0 intervals?
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Usage of EventFilters in akka.testkit

2016-03-22 Thread Sven Hodapp
Dear reader,

I've seen in the documentation that it is possible to do assertions on Akka 
log messages.

http://doc.akka.io/docs/akka/current/scala/testing.html#Expecting_Log_Messages


But the documentation is here a bit sparsely and doesn't explain how to do 
"non-exception" assertions.
You can give me an hint how to do it?

My current code looks similar to this:

class TheActorSpec(_system: ActorSystem)
  extends TestKit(_system)
  with ImplicitSender
  with Matchers
  with FlatSpecLike
  with BeforeAndAfterAll {

  def this() = this(ActorSystem("testsystem", ConfigFactory.parseString("""
  akka.loggers = ["akka.testkit.TestEventListener"]
  """)))

  override def afterAll: Unit = {
system.shutdown()
system.awaitTermination(10.seconds)
  }

  "An Actor" should "be able to assert log messages" in {
val ac = TestActorRef(Props[TheActor])

ac ! Consume(...)

 

// Or with pattern="..."?

EventFilter.info(message="the log content", 
source=classOf[TheActor].getName, 
occurrences=1).assertDone(1.second)  


  }

}


But I get Stacktraces like "[...] java.lang.AssertionError: assertion 
failed: 1 messages outstanding on InfoFilter [...]".
Within TheActor Consume is log.info("the log content") called.

Anybody an idea what's wrong or how to use EventFilter correctly?

Thanks!

Regards,
Sven

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How-to restart a RunnableGraph on failure?

2016-03-22 Thread Endre Varga
On Mon, Mar 21, 2016 at 7:27 PM, Viktor Klang 
wrote:

> "The only thing worse than not saying anything, is telling a lie."
>

This does not apply here. The termination Future would correctly signal
that all stages that has been materialized by that materializer has been
stopped (since they are child actors this is fully possible). My point was
that this feature will not double as a "shut down a graph and be notified
when ready" feature. It will not "lie" though, termination of the
ActorMaterializer results in stopping all of the actors materialized by
that materializer.

-Endre



>
> (i.e. I don't think that's a solution)
>
> On Mon, Mar 21, 2016 at 7:22 PM, Endre Varga 
> wrote:
>
>> The issue is that there might be 3rd party RS stages, or stages from a
>> different materializer, so even such future will not guarantee termination
>> of a graph. But maybe it is still enough.
>>
>> On Mon, Mar 21, 2016 at 5:56 PM, Patrik Nordwall <
>> patrik.nordw...@gmail.com> wrote:
>>
>>> Could we have a termination Future, similar to the ActorSystem?
>>>
>>> mån 21 mars 2016 kl. 16:59 skrev Endre Varga >> >:
>>>
 Ah, OK that will work. The only issue is that shutdown() is
 asynchronous so you cannot be fully sure when it has stopped everything.

 -Endre

 On Mon, Mar 21, 2016 at 4:58 PM,  wrote:

> Hi Endre,
> forgive my wording. I meant calling shutdown() on the materializer of
> the current exception-throwing stream  and then start this  stream
> with a new materializer
>
> Am Montag, 21. März 2016 16:51:01 UTC+1 schrieb Akka Team:
>
>>
>>
>> On Mon, Mar 21, 2016 at 4:45 PM,  wrote:
>>
>>> Hi Endre,
>>> I think  I do have a valid use case. I have a stream which
>>> occasionally fails seriously (maybe 2 times a year) .
>>> Since it has a zip stage I have the option to either handle the
>>> Exception and keep the Zip stage balanced or
>>> just let the exception propagate to the default decider and just
>>> restart the stream.
>>>
>>
>> But there is no such thing as "restart the stream". The current
>> built-in "supervision" only restarts a single stream processing stage but
>> not the whole graph. There is currently no way to restart a whole graph 
>> and
>> it is not a trivial problem to tackle in general.
>>
>> -Endre
>>
>>
>>> The  work of the stream is "transactional" so I have no problems
>>> with the current broken state caused by the exception.
>>>
>>> For me restart simplifies my code so I think It is ok?!
>>>
>>> Am Montag, 21. März 2016 10:35:58 UTC+1 schrieb Akka Team:

 Hi John,

 I am not sure what you hope to achieve there and if that even
 possible. Restarting a graph is more similar to restarting a group of
 actors not to a single actor and it is not that easy to do given the
 backpressure state between the restarted stages must be preserved. 
 Also,
 while one stage fails, other parts of the graph might not and still run
 concurrently, etc.

 -Endre

 On Fri, Mar 18, 2016 at 1:34 PM,  wrote:

> This would be the solution I would use:
>
> http://pastebin.com/pJVnHqcH
>
> I am a little unsure about mat.stop() and the Supervision.stop().
>
> Should the code work? I want to restart on every Exception thrown
> from any Flow,Source,Sink of the Runnable graph
>
> Am Freitag, 18. März 2016 11:28:23 UTC+1 schrieb drewhk:
>>
>> Hi,
>>
>> There is no such thing at the moment.
>>
>> The onComplete on the sink does not guarantee that the whole
>> stream has actually stopped, it only guarantees that the Sink itself 
>> was
>> stopped (which may or may not imply completion of the whole stream; 
>> think
>> of graphs).
>>
>> -Endre
>>
>> On Fri, Mar 18, 2016 at 11:25 AM,  wrote:
>>
>>> Hi,
>>> If a graph throws an Exception in any flow  I want  to restart
>>> the graph.
>>>
>>> Actually I would love to have something like Akka Actors
>>> OneForOneStrategy(10, Duration.create("1 minute"))
>>>
>>> I couldn't find the right hints in the docs. What I am trying
>>> right now is this pseudo code:
>>>
>>> Function decider = exc -> 
>>> Supervision.stop();
>>> Sink sink = ;
>>>
>>> RunnableGraph runnableGraph = 
>>> takeGraph.toMat(sink, Keep.right());
>>>
>>> ActorMaterializer mat = 

[akka-user] How implement recursion in futures correctly? (Java)

2016-03-22 Thread Piper
I am having a hard time understanding how to use futures specifically when 
it comes to implementing functions within those futures. 

So for example, if I wanted to implement factorial in a future using this 
algorithm: 
 int factorial(int n) {

   if (n == 0)
  return 1;
   else
  return (n * factorial(n-1));
   }


My current guess is: 


Future f = future(new function () {
 public Integer apply(Integer i) {
if (i == 0) {
return 1;
}
else {
  return (i * apply(n-1));
}
 }
}, system.dispatcher());
   results = f.onSuccess(new PrintResult(), 
system.dispatcher());

 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.