Re: [akka-user] [akka-streams] Resource pool for use within a flow

2016-04-08 Thread Akka Team
You could probably do something like that, but there is nothing that does
it out of the box. It would not really need to be blocking either,
it could be modeled as a Source that only emits resource instance handles
when they are available.

The problem then becomes to signal being done with a resource, one way
would be to design that as a Flow[ResourceHandle, ResourceHandle] where the
in-flow is return of used resource handles and the outflow is the actual
source. Another would be to signal being done with the resource using some
method on the resource handle, I think that may be the easiest and cleanest
(something like the design of the commit signalling recently done in
reactive-kafka).

Usage could look something like this:

val resourcePool: Source[ResourceHandle, NotUsed] = ???
val indata: Source[Data, NotUsed] = ???

val results: Source[ProcessingResult, NotUsed] =
  indata.zip(resourcePool).map { case (data, resourceHandle) =>
try {
  process(data, resourceHandle.resource)
} finally {
  resourceHandle.imDoneWithYou()
}
  }


Implementing it would probably be a bit harder than your regular streams
usage since you need to ensure thread safety while keeping track of
resources etc. but nothing super complicated IMO.

--
Johan Andrén
Akka Team, Lightbend Inc.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka HTTP bind exception on Windows

2016-04-08 Thread Endre Varga
Hi Michael,

No, Akka HTTP uses Akka's own TCP stack, we don't use Netty there. I guess
we should just fix this in the next release.

-Endre

On Fri, Apr 8, 2016 at 8:47 AM, Michi 
wrote:

> Hi,
>
> is Akka HTTP still using Netty? To solve the problem I built a small HTTP
> server using Netty that just creates a Akka HttpRequest from a Netty
> HttpRequest. This is a bit of a hack but I needed a quick solution because
> our customer gets really impatient and I just needed something that works
> quickly. I did not get bind exceptions starting / stopping Netty on
> Windows. I use the following code (shutdown code from Netty in Action book):
>
>   private val bossGroup = new NioEventLoopGroup(1)
>   private val workerGroup = new NioEventLoopGroup()
>   private val bootstrap = new ServerBootstrap()
>   bootstrap.group(bossGroup,
> workerGroup).channel(classOf[NioServerSocketChannel])
> .option(ChannelOption.SO_BACKLOG.asInstanceOf[ChannelOption[Any]], 200)
> .childOption(ChannelOption.ALLOCATOR.asInstanceOf[ChannelOption[Any]],
> PooledByteBufAllocator.DEFAULT)
> .childHandler(new ChannelInitializer[SocketChannel] {
> override def initChannel(ch: SocketChannel): Unit = {
>   val p = ch.pipeline()
>   p.addLast("decoder", new HttpRequestDecoder)
>   p.addLast("encoder", new HttpResponseEncoder)
>   p.addLast("aggregator", new HttpObjectAggregator(1048576))
>   p.addLast("handler", new HttpRequestHandler(modules))
> }
>   })
>
>   logger.info("Trying to start netty on " + interface + ":" + port)
>   private val serverChannelFuture = bootstrap.bind(interface, port)
>   serverChannelFuture.addListener(new ChannelFutureListener {
> override def operationComplete(future: ChannelFuture): Unit = {
>   if (serverChannelFuture.isSuccess) {
> logger.info("HTTP server bound to " + interface + ":" + port)
>   } else {
> logger.info("Could not bind HTTP server to " + interface + ":" +
> port, serverChannelFuture.cause())
>   }
> }
>   })
>
> override protected def dispose(): Unit = {
> logger.info("Trying to stop HTTP server")
> bossGroup.shutdownGracefully().sync()
> workerGroup.shutdownGracefully().sync()
> logger.info("Stopped boss and worker groups")
> bossGroup.terminationFuture().sync()
> workerGroup.terminationFuture().sync()
> logger.info("Terminated boss and worker groups")
>   }
>
> Sorry I didn't have any time to figure out what the problem is. I hope I
> can get rid of my hack when the next version of Akka is released. Looking
> at several Http frameworks and hacking my own Netty based solution makes me
> realize even more how nice Akka Http actually is.
>
> Thanks for the great work,
> Michael
>
>
> On Thursday, April 7, 2016 at 9:15:58 PM UTC+2, Antti Nevala wrote:
>>
>> No problem, glad to help you.
>>
>> -Antti
>>
>> On Thursday, April 7, 2016 at 3:57:09 PM UTC+3, drewhk wrote:
>>>
>>> Oh, you are our savior! I am 99% sure that that is the solution we are
>>> looking after, thanks!
>>>
>>> -Endre
>>>
>>> On Thu, Apr 7, 2016 at 2:35 PM, Antti Nevala  wrote:
>>>
 Hi,

 I'm not sure if this is related but found similar problem from another
 project and workaround how to solve it:

 https://github.com/kaazing/nuklei/issues/20

 -Antti

 On Thursday, April 7, 2016 at 11:07:03 AM UTC+3, drewhk wrote:
>
> Hi,
>
>
> On Tue, Apr 5, 2016 at 8:40 PM, Michi <
> michael...@physik.tu-muenchen.de> wrote:
>
>> Hi,
>>
>> I think SO_REUSEADDR is enabled by default for Akka HTTP. At least it
>> looks like that when I looked at the code. But even if it is disabled, 
>> the
>> OS should not need ten minutes to release the socket.  Maybe I write a
>> simple test program tomorrow that demonstrates the problem.
>>
>
> Ok, this might be a bug then, please file a ticket. Strange that it
> works on non-Windows though :( (we have test for the behavior)
>
>
>>
>> My problem is that I am running out of time. Our customer is getting
>> impatient and the server-side rest interface is just a small part of the
>> application.
>>
> I think it is probably best if I just use something else for now and
>> go back to Akka HTTP if I have some more time. Can anyone suggest a
>> lightweight, easy to use HTTP Server library for Java to provide a REST
>> interface and deliver some HTML, CSS and Javascript files?
>>
>
> You can use Play for example.
>
> -Endre
>
>
>>
>> Thanks,
>> Michael
>>
>> On Tuesday, April 5, 2016 at 6:34:26 PM UTC+2, drewhk wrote:
>>>
>>> Have you tried to add a configuration (ServerSettings) when you bind
>>> the TCP which enables the ReuseAddress (
>>> http://doc.akka.io/api/akka/2.4.3/#akka.io.Inet$$SO$$ReuseAddress)
>>> option?
>>>
>>> -Endre
>>>
>>> On Tue, Apr 5, 2016 at 6:29 PM, Endre 

Re: [akka-user] Akka HTTP bind exception on Windows

2016-04-08 Thread Viktor Klang
And to clarify, by "we", anyone reading this or the Issue is eligible for
fixing it :)

On Fri, Apr 8, 2016 at 10:11 AM, Endre Varga 
wrote:

> Hi Michael,
>
> No, Akka HTTP uses Akka's own TCP stack, we don't use Netty there. I guess
> we should just fix this in the next release.
>
> -Endre
>
> On Fri, Apr 8, 2016 at 8:47 AM, Michi <
> michael.tha...@physik.tu-muenchen.de> wrote:
>
>> Hi,
>>
>> is Akka HTTP still using Netty? To solve the problem I built a small HTTP
>> server using Netty that just creates a Akka HttpRequest from a Netty
>> HttpRequest. This is a bit of a hack but I needed a quick solution because
>> our customer gets really impatient and I just needed something that works
>> quickly. I did not get bind exceptions starting / stopping Netty on
>> Windows. I use the following code (shutdown code from Netty in Action book):
>>
>>   private val bossGroup = new NioEventLoopGroup(1)
>>   private val workerGroup = new NioEventLoopGroup()
>>   private val bootstrap = new ServerBootstrap()
>>   bootstrap.group(bossGroup,
>> workerGroup).channel(classOf[NioServerSocketChannel])
>> .option(ChannelOption.SO_BACKLOG.asInstanceOf[ChannelOption[Any]],
>> 200)
>>
>> .childOption(ChannelOption.ALLOCATOR.asInstanceOf[ChannelOption[Any]],
>> PooledByteBufAllocator.DEFAULT)
>> .childHandler(new ChannelInitializer[SocketChannel] {
>> override def initChannel(ch: SocketChannel): Unit = {
>>   val p = ch.pipeline()
>>   p.addLast("decoder", new HttpRequestDecoder)
>>   p.addLast("encoder", new HttpResponseEncoder)
>>   p.addLast("aggregator", new HttpObjectAggregator(1048576))
>>   p.addLast("handler", new HttpRequestHandler(modules))
>> }
>>   })
>>
>>   logger.info("Trying to start netty on " + interface + ":" + port)
>>   private val serverChannelFuture = bootstrap.bind(interface, port)
>>   serverChannelFuture.addListener(new ChannelFutureListener {
>> override def operationComplete(future: ChannelFuture): Unit = {
>>   if (serverChannelFuture.isSuccess) {
>> logger.info("HTTP server bound to " + interface + ":" + port)
>>   } else {
>> logger.info("Could not bind HTTP server to " + interface + ":" +
>> port, serverChannelFuture.cause())
>>   }
>> }
>>   })
>>
>> override protected def dispose(): Unit = {
>> logger.info("Trying to stop HTTP server")
>> bossGroup.shutdownGracefully().sync()
>> workerGroup.shutdownGracefully().sync()
>> logger.info("Stopped boss and worker groups")
>> bossGroup.terminationFuture().sync()
>> workerGroup.terminationFuture().sync()
>> logger.info("Terminated boss and worker groups")
>>   }
>>
>> Sorry I didn't have any time to figure out what the problem is. I hope I
>> can get rid of my hack when the next version of Akka is released. Looking
>> at several Http frameworks and hacking my own Netty based solution makes me
>> realize even more how nice Akka Http actually is.
>>
>> Thanks for the great work,
>> Michael
>>
>>
>> On Thursday, April 7, 2016 at 9:15:58 PM UTC+2, Antti Nevala wrote:
>>>
>>> No problem, glad to help you.
>>>
>>> -Antti
>>>
>>> On Thursday, April 7, 2016 at 3:57:09 PM UTC+3, drewhk wrote:

 Oh, you are our savior! I am 99% sure that that is the solution we are
 looking after, thanks!

 -Endre

 On Thu, Apr 7, 2016 at 2:35 PM, Antti Nevala  wrote:

> Hi,
>
> I'm not sure if this is related but found similar problem from another
> project and workaround how to solve it:
>
> https://github.com/kaazing/nuklei/issues/20
>
> -Antti
>
> On Thursday, April 7, 2016 at 11:07:03 AM UTC+3, drewhk wrote:
>>
>> Hi,
>>
>>
>> On Tue, Apr 5, 2016 at 8:40 PM, Michi <
>> michael...@physik.tu-muenchen.de> wrote:
>>
>>> Hi,
>>>
>>> I think SO_REUSEADDR is enabled by default for Akka HTTP. At least
>>> it looks like that when I looked at the code. But even if it is 
>>> disabled,
>>> the OS should not need ten minutes to release the socket.  Maybe I 
>>> write a
>>> simple test program tomorrow that demonstrates the problem.
>>>
>>
>> Ok, this might be a bug then, please file a ticket. Strange that it
>> works on non-Windows though :( (we have test for the behavior)
>>
>>
>>>
>>> My problem is that I am running out of time. Our customer is getting
>>> impatient and the server-side rest interface is just a small part of the
>>> application.
>>>
>> I think it is probably best if I just use something else for now and
>>> go back to Akka HTTP if I have some more time. Can anyone suggest a
>>> lightweight, easy to use HTTP Server library for Java to provide a REST
>>> interface and deliver some HTML, CSS and Javascript files?
>>>
>>
>> You can use Play for example.
>>
>> -Endre
>>
>>
>>>
>>> Thanks,
>>> Michael
>>>
>>> On Tuesday, April 5, 2016 at 6

Re: [akka-user] Akka HTTP bind exception on Windows

2016-04-08 Thread Endre Varga
But first, please file a ticket!

On Fri, Apr 8, 2016 at 11:24 AM, Viktor Klang 
wrote:

> And to clarify, by "we", anyone reading this or the Issue is eligible for
> fixing it :)
>
> On Fri, Apr 8, 2016 at 10:11 AM, Endre Varga 
> wrote:
>
>> Hi Michael,
>>
>> No, Akka HTTP uses Akka's own TCP stack, we don't use Netty there. I
>> guess we should just fix this in the next release.
>>
>> -Endre
>>
>> On Fri, Apr 8, 2016 at 8:47 AM, Michi <
>> michael.tha...@physik.tu-muenchen.de> wrote:
>>
>>> Hi,
>>>
>>> is Akka HTTP still using Netty? To solve the problem I built a small
>>> HTTP server using Netty that just creates a Akka HttpRequest from a Netty
>>> HttpRequest. This is a bit of a hack but I needed a quick solution because
>>> our customer gets really impatient and I just needed something that works
>>> quickly. I did not get bind exceptions starting / stopping Netty on
>>> Windows. I use the following code (shutdown code from Netty in Action book):
>>>
>>>   private val bossGroup = new NioEventLoopGroup(1)
>>>   private val workerGroup = new NioEventLoopGroup()
>>>   private val bootstrap = new ServerBootstrap()
>>>   bootstrap.group(bossGroup,
>>> workerGroup).channel(classOf[NioServerSocketChannel])
>>> .option(ChannelOption.SO_BACKLOG.asInstanceOf[ChannelOption[Any]],
>>> 200)
>>>
>>> .childOption(ChannelOption.ALLOCATOR.asInstanceOf[ChannelOption[Any]],
>>> PooledByteBufAllocator.DEFAULT)
>>> .childHandler(new ChannelInitializer[SocketChannel] {
>>> override def initChannel(ch: SocketChannel): Unit = {
>>>   val p = ch.pipeline()
>>>   p.addLast("decoder", new HttpRequestDecoder)
>>>   p.addLast("encoder", new HttpResponseEncoder)
>>>   p.addLast("aggregator", new HttpObjectAggregator(1048576))
>>>   p.addLast("handler", new HttpRequestHandler(modules))
>>> }
>>>   })
>>>
>>>   logger.info("Trying to start netty on " + interface + ":" + port)
>>>   private val serverChannelFuture = bootstrap.bind(interface, port)
>>>   serverChannelFuture.addListener(new ChannelFutureListener {
>>> override def operationComplete(future: ChannelFuture): Unit = {
>>>   if (serverChannelFuture.isSuccess) {
>>> logger.info("HTTP server bound to " + interface + ":" + port)
>>>   } else {
>>> logger.info("Could not bind HTTP server to " + interface + ":"
>>> + port, serverChannelFuture.cause())
>>>   }
>>> }
>>>   })
>>>
>>> override protected def dispose(): Unit = {
>>> logger.info("Trying to stop HTTP server")
>>> bossGroup.shutdownGracefully().sync()
>>> workerGroup.shutdownGracefully().sync()
>>> logger.info("Stopped boss and worker groups")
>>> bossGroup.terminationFuture().sync()
>>> workerGroup.terminationFuture().sync()
>>> logger.info("Terminated boss and worker groups")
>>>   }
>>>
>>> Sorry I didn't have any time to figure out what the problem is. I hope I
>>> can get rid of my hack when the next version of Akka is released. Looking
>>> at several Http frameworks and hacking my own Netty based solution makes me
>>> realize even more how nice Akka Http actually is.
>>>
>>> Thanks for the great work,
>>> Michael
>>>
>>>
>>> On Thursday, April 7, 2016 at 9:15:58 PM UTC+2, Antti Nevala wrote:

 No problem, glad to help you.

 -Antti

 On Thursday, April 7, 2016 at 3:57:09 PM UTC+3, drewhk wrote:
>
> Oh, you are our savior! I am 99% sure that that is the solution we are
> looking after, thanks!
>
> -Endre
>
> On Thu, Apr 7, 2016 at 2:35 PM, Antti Nevala 
> wrote:
>
>> Hi,
>>
>> I'm not sure if this is related but found similar problem from
>> another project and workaround how to solve it:
>>
>> https://github.com/kaazing/nuklei/issues/20
>>
>> -Antti
>>
>> On Thursday, April 7, 2016 at 11:07:03 AM UTC+3, drewhk wrote:
>>>
>>> Hi,
>>>
>>>
>>> On Tue, Apr 5, 2016 at 8:40 PM, Michi <
>>> michael...@physik.tu-muenchen.de> wrote:
>>>
 Hi,

 I think SO_REUSEADDR is enabled by default for Akka HTTP. At least
 it looks like that when I looked at the code. But even if it is 
 disabled,
 the OS should not need ten minutes to release the socket.  Maybe I 
 write a
 simple test program tomorrow that demonstrates the problem.

>>>
>>> Ok, this might be a bug then, please file a ticket. Strange that it
>>> works on non-Windows though :( (we have test for the behavior)
>>>
>>>

 My problem is that I am running out of time. Our customer is
 getting impatient and the server-side rest interface is just a small 
 part
 of the application.

>>> I think it is probably best if I just use something else for now and
 go back to Akka HTTP if I have some more time. Can anyone suggest a
 lightweight, easy to use HTTP Server library for Java to provide a REST

Re: [akka-user] Akka HTTP bind exception on Windows

2016-04-08 Thread Viktor Klang
And if you want to do the PR first, then I'll open the ticket for you :)

On Fri, Apr 8, 2016 at 11:58 AM, Endre Varga 
wrote:

> But first, please file a ticket!
>
> On Fri, Apr 8, 2016 at 11:24 AM, Viktor Klang 
> wrote:
>
>> And to clarify, by "we", anyone reading this or the Issue is eligible for
>> fixing it :)
>>
>> On Fri, Apr 8, 2016 at 10:11 AM, Endre Varga 
>> wrote:
>>
>>> Hi Michael,
>>>
>>> No, Akka HTTP uses Akka's own TCP stack, we don't use Netty there. I
>>> guess we should just fix this in the next release.
>>>
>>> -Endre
>>>
>>> On Fri, Apr 8, 2016 at 8:47 AM, Michi <
>>> michael.tha...@physik.tu-muenchen.de> wrote:
>>>
 Hi,

 is Akka HTTP still using Netty? To solve the problem I built a small
 HTTP server using Netty that just creates a Akka HttpRequest from a Netty
 HttpRequest. This is a bit of a hack but I needed a quick solution because
 our customer gets really impatient and I just needed something that works
 quickly. I did not get bind exceptions starting / stopping Netty on
 Windows. I use the following code (shutdown code from Netty in Action 
 book):

   private val bossGroup = new NioEventLoopGroup(1)
   private val workerGroup = new NioEventLoopGroup()
   private val bootstrap = new ServerBootstrap()
   bootstrap.group(bossGroup,
 workerGroup).channel(classOf[NioServerSocketChannel])
 .option(ChannelOption.SO_BACKLOG.asInstanceOf[ChannelOption[Any]],
 200)

 .childOption(ChannelOption.ALLOCATOR.asInstanceOf[ChannelOption[Any]],
 PooledByteBufAllocator.DEFAULT)
 .childHandler(new ChannelInitializer[SocketChannel] {
 override def initChannel(ch: SocketChannel): Unit = {
   val p = ch.pipeline()
   p.addLast("decoder", new HttpRequestDecoder)
   p.addLast("encoder", new HttpResponseEncoder)
   p.addLast("aggregator", new HttpObjectAggregator(1048576))
   p.addLast("handler", new HttpRequestHandler(modules))
 }
   })

   logger.info("Trying to start netty on " + interface + ":" + port)
   private val serverChannelFuture = bootstrap.bind(interface, port)
   serverChannelFuture.addListener(new ChannelFutureListener {
 override def operationComplete(future: ChannelFuture): Unit = {
   if (serverChannelFuture.isSuccess) {
 logger.info("HTTP server bound to " + interface + ":" + port)
   } else {
 logger.info("Could not bind HTTP server to " + interface + ":"
 + port, serverChannelFuture.cause())
   }
 }
   })

 override protected def dispose(): Unit = {
 logger.info("Trying to stop HTTP server")
 bossGroup.shutdownGracefully().sync()
 workerGroup.shutdownGracefully().sync()
 logger.info("Stopped boss and worker groups")
 bossGroup.terminationFuture().sync()
 workerGroup.terminationFuture().sync()
 logger.info("Terminated boss and worker groups")
   }

 Sorry I didn't have any time to figure out what the problem is. I hope
 I can get rid of my hack when the next version of Akka is released. Looking
 at several Http frameworks and hacking my own Netty based solution makes me
 realize even more how nice Akka Http actually is.

 Thanks for the great work,
 Michael


 On Thursday, April 7, 2016 at 9:15:58 PM UTC+2, Antti Nevala wrote:
>
> No problem, glad to help you.
>
> -Antti
>
> On Thursday, April 7, 2016 at 3:57:09 PM UTC+3, drewhk wrote:
>>
>> Oh, you are our savior! I am 99% sure that that is the solution we
>> are looking after, thanks!
>>
>> -Endre
>>
>> On Thu, Apr 7, 2016 at 2:35 PM, Antti Nevala 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm not sure if this is related but found similar problem from
>>> another project and workaround how to solve it:
>>>
>>> https://github.com/kaazing/nuklei/issues/20
>>>
>>> -Antti
>>>
>>> On Thursday, April 7, 2016 at 11:07:03 AM UTC+3, drewhk wrote:

 Hi,


 On Tue, Apr 5, 2016 at 8:40 PM, Michi <
 michael...@physik.tu-muenchen.de> wrote:

> Hi,
>
> I think SO_REUSEADDR is enabled by default for Akka HTTP. At least
> it looks like that when I looked at the code. But even if it is 
> disabled,
> the OS should not need ten minutes to release the socket.  Maybe I 
> write a
> simple test program tomorrow that demonstrates the problem.
>

 Ok, this might be a bug then, please file a ticket. Strange that it
 works on non-Windows though :( (we have test for the behavior)


>
> My problem is that I am running out of time. Our customer is
> getting impatient and the server-side rest interface is just a small 
> part
>>>

[akka-user] Best practice for consuming a Http response entity's data stream

2016-04-08 Thread Chris Baxter
If I want to consume a Http service and then do something with the response 
body, there are a couple of ways to go about doing that.  The two ones that 
I am trying to decide between are:

val f:Future[ByteString] =

  Source.single(req).

via(outgoingConn).

flatMapConcat(_.entity.dataBytes).

completionTimeout(timeout).

runWith(Sink.head)


and


val f:Future[ByteString] =

  Source.single(req).

via(pool).

mapAsync(1){ resp =>

  resp.entity.toStrict(timeout).map(_.data )

}.

completionTimeout(timeout).

runWith(Sink.head)


I'm thinking the first approach is the better one.  Up until now, my common 
code for making outbound request has been using the second approach.  I'm 
about to refactor that code into using the first approach as it seems 
cleaner and requires less use of Futures.  Just wanted to see what the 
consensus from the Akka team and others was on this.



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Best practice for consuming a Http response entity's data stream

2016-04-08 Thread Viktor Klang
On Fri, Apr 8, 2016 at 1:06 PM, Chris Baxter  wrote:

> If I want to consume a Http service and then do something with the
> response body, there are a couple of ways to go about doing that.  The two
> ones that I am trying to decide between are:
>
> val f:Future[ByteString] =
>
>   Source.single(req).
>
> via(outgoingConn).
>
> flatMapConcat(_.entity.dataBytes).
>
> completionTimeout(timeout).
>
> runWith(Sink.head)
>

This does not return the entire body as a ByteString.


> and
>
>
> val f:Future[ByteString] =
>
>   Source.single(req).
>
> via(pool).
>
> mapAsync(1){ resp =>
>
>   resp.entity.toStrict(timeout).map(_.data )
>
> }.
>
> completionTimeout(timeout).
>
> runWith(Sink.head)
>
>
> I'm thinking the first approach is the better one.  Up until now, my
> common code for making outbound request has been using the second
> approach.  I'm about to refactor that code into using the first approach as
> it seems cleaner and requires less use of Futures.  Just wanted to see what
> the consensus from the Akka team and others was on this.
>
My question is: why do you need to eagerly read everything into memory to
"do something with the repsonse body"?


>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] ANNOUNCE: Released akka-stream-kafka 0.11-M1

2016-04-08 Thread David Pennell
Are there plans WRT akka persistence on kafka?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] ANNOUNCE: Released akka-stream-kafka 0.11-M1

2016-04-08 Thread Patrik Nordwall
There is https://github.com/krasserm/akka-persistence-kafka as a community
maintained project.
/Patrik

On Fri, Apr 8, 2016 at 2:32 PM, David Pennell 
wrote:

> Are there plans WRT akka persistence on kafka?
>
> --
> >>  Read the docs: http://akka.io/docs/
> >>  Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>  Search the archives:
> https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Best practice for consuming a Http response entity's data stream

2016-04-08 Thread Chris Baxter
Thanks for responding Viktor.

1) I see the flaw in this design (flatMapConcat) now.  If the response is 
chunked you only read the first chunk
2) I want to be able to parse the body as json and have the final result of 
the flow be a Future for some object that I have mapped the response json 
to.  Any suggestions for doing that w/o reading the entire byte string into 
memory?  Are you maybe suggesting that instead of feeding something 
complete into my json parser (String, Array[Byte]) that I should instead 
try and feed in something that is more stream oriented, such as an 
InputStream and then find a way to plumb that together with the response 
stream?

Any suggestions for a Flow that can deal with chunk and feed the data into 
a parsing stage w/o having to read it all into memory would be greatly 
appreciated?  I don't need perfect code, just an approach so I can take it 
from there.

On Friday, April 8, 2016 at 7:13:51 AM UTC-4, √ wrote:
>
>
>
> On Fri, Apr 8, 2016 at 1:06 PM, Chris Baxter  > wrote:
>
>> If I want to consume a Http service and then do something with the 
>> response body, there are a couple of ways to go about doing that.  The two 
>> ones that I am trying to decide between are:
>>
>> val f:Future[ByteString] =
>>
>>   Source.single(req).
>>
>> via(outgoingConn).
>>
>> flatMapConcat(_.entity.dataBytes).
>>
>> completionTimeout(timeout).
>>
>> runWith(Sink.head)
>>
>
> This does not return the entire body as a ByteString.
>  
>
>> and
>>
>>
>> val f:Future[ByteString] =
>>
>>   Source.single(req).
>>
>> via(pool).
>>
>> mapAsync(1){ resp =>
>>
>>   resp.entity.toStrict(timeout).map(_.data )
>>
>> }.
>>
>> completionTimeout(timeout).
>>
>> runWith(Sink.head)
>>
>>
>> I'm thinking the first approach is the better one.  Up until now, my 
>> common code for making outbound request has been using the second 
>> approach.  I'm about to refactor that code into using the first approach as 
>> it seems cleaner and requires less use of Futures.  Just wanted to see what 
>> the consensus from the Akka team and others was on this.
>>
> My question is: why do you need to eagerly read everything into memory to 
> "do something with the repsonse body"?
>  
>
>>
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Best practice for consuming a Http response entity's data stream

2016-04-08 Thread Chris Baxter
None of the out of the box Unmarshallers for json (like SprayJsonSupport) 
support parsing in a lazy and streaming way.  I did find this repo which 
looks promising:

https://github.com/knutwalker/akka-stream-json

Is this the best kind of approach?  It's certainly nice in that you don't 
have to read all of the data into memory, but large responses are not the 
norm and are more of the exception for us.

On Friday, April 8, 2016 at 8:59:10 AM UTC-4, Chris Baxter wrote:
>
> Thanks for responding Viktor.
>
> 1) I see the flaw in this design (flatMapConcat) now.  If the response is 
> chunked you only read the first chunk
> 2) I want to be able to parse the body as json and have the final result 
> of the flow be a Future for some object that I have mapped the response 
> json to.  Any suggestions for doing that w/o reading the entire byte string 
> into memory?  Are you maybe suggesting that instead of feeding something 
> complete into my json parser (String, Array[Byte]) that I should instead 
> try and feed in something that is more stream oriented, such as an 
> InputStream and then find a way to plumb that together with the response 
> stream?
>
> Any suggestions for a Flow that can deal with chunk and feed the data into 
> a parsing stage w/o having to read it all into memory would be greatly 
> appreciated?  I don't need perfect code, just an approach so I can take it 
> from there.
>
> On Friday, April 8, 2016 at 7:13:51 AM UTC-4, √ wrote:
>>
>>
>>
>> On Fri, Apr 8, 2016 at 1:06 PM, Chris Baxter  wrote:
>>
>>> If I want to consume a Http service and then do something with the 
>>> response body, there are a couple of ways to go about doing that.  The two 
>>> ones that I am trying to decide between are:
>>>
>>> val f:Future[ByteString] =
>>>
>>>   Source.single(req).
>>>
>>> via(outgoingConn).
>>>
>>> flatMapConcat(_.entity.dataBytes).
>>>
>>> completionTimeout(timeout).
>>>
>>> runWith(Sink.head)
>>>
>>
>> This does not return the entire body as a ByteString.
>>  
>>
>>> and
>>>
>>>
>>> val f:Future[ByteString] =
>>>
>>>   Source.single(req).
>>>
>>> via(pool).
>>>
>>> mapAsync(1){ resp =>
>>>
>>>   resp.entity.toStrict(timeout).map(_.data )
>>>
>>> }.
>>>
>>> completionTimeout(timeout).
>>>
>>> runWith(Sink.head)
>>>
>>>
>>> I'm thinking the first approach is the better one.  Up until now, my 
>>> common code for making outbound request has been using the second 
>>> approach.  I'm about to refactor that code into using the first approach as 
>>> it seems cleaner and requires less use of Futures.  Just wanted to see what 
>>> the consensus from the Akka team and others was on this.
>>>
>> My question is: why do you need to eagerly read everything into memory to 
>> "do something with the repsonse body"?
>>  
>>
>>>
>>>
>>> -- 
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> -- 
>> Cheers,
>> √
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Why should nodes usually not include themselves in seed node list?

2016-04-08 Thread Patrik Nordwall
It's only the first node in the seed nodes list that is special. It's only
that one that will join itself to bootstrap the cluster, after it has tried
joining the other nodes.

Sorting is fine.

For the AWS part you might get something from this blog post:
http://chrisloy.net/2014/05/11/akka-cluster-ec2-autoscaling.html

On Thu, Apr 7, 2016 at 9:21 PM, Curt Siffert  wrote:

> Hi, when discovering seed nodes dynamically, why is it important for every
> node (except for one) to not include itself in the seed node list?  How
> could split brain happen if all nodes have the same list order of all seed
> nodes?
>
> From what I understand, split-brain can only happen if multiple nodes put
> themselves first in the seed node list, is that correct?
>
> I ask because when doing static configuration, it's common for all nodes
> to have the same seed node list, which means all seed nodes would have
> themselves in the list.  And that seems to be common, recommended practice.
>
> If doing dynamic discovery of seed nodes, is it okay for all nodes to have
> the same list of seed nodes, if we can guarantee they are all in the same
> order? From reading the docs, I gather that after sending a message to all
> seed nodes, if none respond, then it will try to join the first node in the
> list, which wouldn't work if the first node in the list isn't itself.
>
> We are considering using the AWS SDK by not querying the ASG for its
> instance list until the cloudformation stack says it is complete, which
> should guarantee that all nodes trying to join the cluster will get the
> same list of instances in the ASG.  Then we can guarantee that all nodes
> would order the nodes the same - any reason this wouldn't work?
>
> Thanks,
> Curt
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Stream Testkit expectNextType ?

2016-04-08 Thread Patrik Nordwall
Hi Derek,
Sounds like a great addition. Will you take a stab at it?
/Patrik

On Thu, Apr 7, 2016 at 9:59 PM, Derek Wyatt  wrote:

> Hi guys,
>
> I've been writing some tests for my stream code and have run up against a
> situation where I would like to have expectNextType.  I've implemented
> this myself with a pimp, but I'm wondering if there's a better way?
>
>   implicit class ProbeExtension[T](val p: Probe[T]) {
> def expectNextType[A](implicit t: ClassTag[A]): p.Self = {
>   p.expectNextPF {
> case a: A =>
> case x => fail(s"Expected a $t type but got a
> ${x.getClass.getName}")
>   }
>   p
> }
>   }
>
> This is mostly useful when the 'A' in question is a derivation of Try[_].
> I tend to write expectNextType[Failure[_]] in these cases.  And, just to
> give some context as to why I would want to do that: My flow components
> might return a Try, which I then pass through a special handling / logging
> flow, which transforms them to a List that is composed with a mapConcat.
> So...
>
> val flowReturningTry: Flow[T, Try[U], NotUsed] = ...
> val handlingAndLogging: Flow[Try[U], U, NotUsed] = Flow[Try[U]].mapConcat
> { /* List(u) or List(), depending on the Try */ }
> val composed: Flow[T, U, NotUsed] =
> flowReturningTry.mapConcat(handlingAndLogging)
>
> Now, when testing flowReturningTry in isolation, the expectNextType is
> useful.
>
> Cheers and thanks,
> Derek
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka persistence - errors on the read side

2016-04-08 Thread Patrik Nordwall
On Thu, Apr 7, 2016 at 5:17 PM, Richard Rodseth  wrote:

> Also, seeing the work in reactive-kafka to allow the following, I wonder
> if this is making you rethink akka-persistence-query APIs
>

Why? What is wrong? Persistence query api also has an offset parameter that
can be used as the starting point of the stream.


>
> db.loadOffset().foreach { fromOffset =>
> val settings = consumerSettings
>   .withFromOffset(new TopicPartition("topic1", 1), fromOffset)
> Consumer.plainSource(settings)
>   .mapAsync(1)(db.save)
>   }
>
>
> On Thu, Apr 7, 2016 at 7:26 AM, Richard Rodseth 
> wrote:
>
>> Isn't this section of the docs naïve in the face of possible errors?
>>
>>
>> http://doc.akka.io/docs/akka/current/scala/persistence-query.html#Materialize_view_to_Reactive_Streams_compatible_datastore
>>
>> Later on that page is an incomplete example of a resumable projection.
>> Won't something like that need to be the norm?
>>
>>
>>
>> On Fri, Feb 26, 2016 at 1:55 PM, Richard Rodseth 
>> wrote:
>>
>>> Hmm. I see the fromSequenceNr parameter in the query traits. So the read
>>> side would have to persist a watermark? Where? I was hoping for less
>>> boilerplate.
>>>
>>> On Fri, Feb 26, 2016 at 5:47 AM, Patrik Nordwall <
>>> patrik.nordw...@gmail.com> wrote:
>>>


 On Wed, Feb 24, 2016 at 10:33 PM, Richard Rodseth 
 wrote:

> In Vaughn Vernon's Red Book (Implementing DDD) he talks about storing
> domain events in the same transaction as the one which updates an
> aggregate, and then out of band you read this domain event store (not in
> the event sourcing sense)  in order to put messages on a message queue, 
> for
> example, to notify remote bounded contexts.
>
> In an akka-cassandra-kafka world, with no transactions, I imagine an
> actor with an akka persistence query stream, that is responsible for
> putting a domain event on a Kafka topic. In this scenario, would errors
> writing to the Kafka topic be handled with retries and ultimately human
> intervention? Is the way to add retry semantics to stream computation to
> send the stream contents to an actor ref which does the writing?
>

 Easiest would probably be to use akka-persistence-kafka
  as the persistent
 actor journal, but I think you are asking for something else. Assume that
 you store the events from the persistent actor to Cassandra, and then have
 a query to read that stream and put them into Kafka. If the Kafka write
 fails you have read the events from Cassandra again from the last know
 successful offset, and try again.

 /Patrik


>
> In short: are there any good examples yet of the akka persistence
> approach to DDD, including published domain events (which I think are
> distinct from raw persistence events).
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives:
> https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google
> Groups "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



 --

 Patrik Nordwall
 Lightbend  -  Reactive apps on the JVM
 Twitter: @patriknw

 [image: Lightbend] 

 --
 >> Read the docs: http://akka.io/docs/
 >> Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
 >> Search the archives:
 https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google
 Groups "Akka User List" group.
 To unsubscribe from this group and stop receiving emails from it, send
 an email to akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at https://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.

>>>
>>>
>>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.

[akka-user] In Akka how does cassandra react when it is full and cannot persist more messages

2016-04-08 Thread Amruta
In Akka we are using cassandra to persist data. Does Akka automatically 
cleanup cassandra based on timestamp of messages or do we need to manually 
(in code) do it? If so any pointers would be helpful.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams Remote Materialization

2016-04-08 Thread Patrik Nordwall
No news. We are currently not working on it. You can keep an eye on what is
done in Gearpump.

http://www.gearpump.io/

http://www.marketwired.com/press-release/lightbend-announces-collaboration-bring-low-latency-high-availability-big-data-2099511.htm

On Wed, Apr 6, 2016 at 11:14 PM, César Aguilera 
wrote:

> Hi all,
>
> any news concerning this request?
>
> Cheers
>
>
>
> On Thursday, 28 May 2015 11:16:36 UTC+2, Oliver Winks wrote:
>>
>> Thanks for the info. I'm looking forward to future releases of Akka
>> Streams.
>> Cheers
>>
>> On Wednesday, 27 May 2015 22:24:44 UTC+1, Akka Team wrote:
>>>
>>> Hi Oliver,
>>> we do not (currently) support distributed materialization of streams.
>>> The reason is that it will require implementing redelivery for stream
>>> messages and a number of related issues which need to be fixed, which has
>>> not happened yet.
>>>
>>> Currently we are focusing on getting the 1.0 out the door, which means
>>> API stability, we also need to work on in-memory performance as it has not
>>> yet been a focus,
>>> and is a critical point for making Akka HTTP as performant as Spray - at
>>> which point we'll be happy to recommend using streams in production systems.
>>> Please remember that 1.0 still means that streams are experimental.
>>>
>>> The distributed scenario is a very interesting one, but we do not have
>>> enough people/time to throw at that problem currently as other tasks are
>>> more urgent.
>>> Hope this explains things a bit!
>>>
>>> -- konrad
>>>
>>> On Sat, May 23, 2015 at 3:18 AM, Oliver Winks 
>>> wrote:
>>>
 Hi,

 The way I understand materialisation in Akka Streams is that the
 ActorFlowMaterializer will create a number of actors which are used to
 process the flows within a stream. Is it possible to control the number and
 location of actors that get materialised when running a Flow? I'd like to
 be able to create remote actors on several machines for processing my
 FlowGraph.

 Thanks,
 Oli.

 --
 >> Read the docs: http://akka.io/docs/
 >> Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
 >> Search the archives:
 https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google
 Groups "Akka User List" group.
 To unsubscribe from this group and stop receiving emails from it, send
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.

>>>
>>>
>>>
>>> --
>>> Akka Team
>>> Typesafe - Reactive apps on the JVM
>>> Blog: letitcrash.com
>>> Twitter: @akkateam
>>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
>
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Dstributing data in akka cluster (guidance needed)

2016-04-08 Thread Patrik Nordwall
Take a look at Akka Distributed Data
 and
perhaps you can add the durability as a separate concern.

/Patrik

On Tue, Apr 5, 2016 at 10:37 PM, Val P  wrote:

> HI,
>
> I am looking for guidance, if anyway is willing to share their experience:
>
> I have an akka distributed system where I want to store small amounts of
> data (basically tombstones and other assorted bits). They may be stored on
> one node and consumed on another at a later time if the connection is later
> made to that other node. They aid in resuming some specialized TCP
> connections in a cluster, mostly.
>
> What I'm looking for is a way to distribute this data in such a way that
> it's durable and eventually consistent across all nodes. I do not need it
> in memory. Basically I'd like to implement a key-value store similar to
> CoreOS etcd, but integrated with my Akka Cluster, instead of a separate
> entity with its own gossiping cluster and configuration.
>
> Can anyone provide any guidance / pointers?
>
> Thanks in advance, and apologies if I'm missing something obvious in Akka
> Persistence library.
>
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka Persistence with Cassandra

2016-04-08 Thread Patrik Nordwall
Are you using Cassandra 3.x? Read up on the compatibility aspects in the
README: https://github.com/akka/akka-persistence-cassandra

On Wed, Apr 6, 2016 at 5:59 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> If I use below application.conf (default settings).
>
> akka {
>   persistence {
> journal.plugin="cassandra-journal"
> snapshot-store.plugin="cassandra-snapshot-store"
> }
> }
>
> I got below exception
>
> Starting 
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> [INFO] [04/06/2016 09:26:31.933]
> [persistent-actors-akka.actor.default-dispatcher-4]
> [akka://persistent-actors/user/$a] Message
> [com.akka.persistence.Counter$Cmd] from
> Actor[akka://persistent-actors/deadLetters] to
> Actor[akka://persistent-actors/user/$a#1418786537] was not delivered. [1]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> [INFO] [04/06/2016 09:26:31.933]
> [persistent-actors-akka.actor.default-dispatcher-4]
> [akka://persistent-actors/user/$a] Message
> [com.akka.persistence.Counter$Cmd] from
> Actor[akka://persistent-actors/deadLetters] to
> Actor[akka://persistent-actors/user/$a#1418786537] was not delivered. [2]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> [INFO] [04/06/2016 09:26:31.933]
> [persistent-actors-akka.actor.default-dispatcher-4]
> [akka://persistent-actors/user/$a] Message
> [com.akka.persistence.Counter$Cmd] from
> Actor[akka://persistent-actors/deadLetters] to
> Actor[akka://persistent-actors/user/$a#1418786537] was not delivered. [3]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> [INFO] [04/06/2016 09:26:31.933]
> [persistent-actors-akka.actor.default-dispatcher-4]
> [akka://persistent-actors/user/$a] Message [java.lang.String] from
> Actor[akka://persistent-actors/deadLetters] to
> Actor[akka://persistent-actors/user/$a#1418786537] was not delivered. [4]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> *[WARN] [04/06/2016 09:26:48.321]
> [persistent-actors-cassandra-journal.default-dispatcher-5]
> [akka://persistent-actors/system/cassandra-journal] Failed to connect to
> Cassandra and initialize. It will be retried on demand. Caused by: line
> 2:13 no viable alternative at input 'MATERIALIZED' (  [CREATE]
> MATERIALIZED...)*
>
> Regards,
> Rajesh
>
> On Tuesday, April 5, 2016 at 10:23:02 AM UTC+5:30, Patrik Nordwall wrote:
>>
>> Could it be something with authentication?
>>
>> tis 5 apr. 2016 kl. 05:41 skrev Madabhattula Rajesh Kumar <
>> mraj...@gmail.com>:
>>
>>>
>>>
>>> On Monday, April 4, 2016 at 6:29:58 PM UTC+5:30, Madabhattula Rajesh
>>> Kumar wrote:


 Hi,

 I am not able to connect Cassandra journal from Akka Persistence.
 Please find below my application.conf and exception details.

 Could you please help me to resolve this issue.

 akka {
   persistence {
 journal.plugin="cassandra-journal"
 snapshot-store.plugin="cassandra-snapshot-store"
 }
 }

 cassandra-journal {
 class="akka.persistence.cassandra.journal.CassandraJournal"
 contact-points = [“127.0.0.1”]
 port = 9042
 keyspace = “akkajournal”
 table = "journal"
 keyspace-autocreate = true

 authentication {
 username = "cassandra"
 password = "cassandra"
   }
   }

   cassandra-snapshot-store {
   class = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore"
   contact-points = [“127.0.0.1”]
   port = 9042
 keyspace = “akkasnapshot”
 table = "snapshots"
 keyspace-autocreate = true

 authentication {
 username = "cassandra"
 password = "cassandra"
 }
   }



 *Exception :-*SLF4J: Failed to load class
 "org.slf4j.impl.StaticLoggerBinder".
 SLF4J: Defaulting to no-operation (NOP) logger implementation
 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
 further details.
 [INFO] [04/04/2016 18:25:18.760]
 [persistent-actors-akka.actor.default-dispatcher-3]
 [akka://persistent-actors/user/$a] Message
 [com.akka.persistence.Counter$Cmd] from
 Actor[akka://persistent-actors/deadLetters] to
 Actor[akka://persistent-actors/user/$a#1288992689] was not delive

[akka-user] Compiler saying "NotUsed" should be "BoxedUnit"

2016-04-08 Thread James P
Hi Folks,

I'm trying out the tutorial on Akka Streams (Java) and the following line 
of code from the tutorial:

final Source source =
   Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));


causes a compilation failure. It says that "NotUsed" should be a BoxedUnit. 

Not really a big problem, but from what I understand, BoxedUnit was 
replaced with NotUsed in akka 2.4.2.

Anybody have an idea why this might be happening?

Running: akka 2.4.3 and akka-streams 2.0.4.
Tutorial from here: 
http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-flows-and-basics.html

Any help would be much appreciated. Thanks.

Best regards,
James


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka persistence - errors on the read side

2016-04-08 Thread Richard Rodseth
Sorry, I'm probably confused. I don't think I've seen a full example of an
Akka Persistence projection, including error handling and saving offsets
persistently. So the infinite stream API seems to be of dubious value
unless you don't care about resilience:

val willNotCompleteTheStreamTagged: Source[EventEnvelope, NotUsed] =
readJournal.eventsByTag("user-added", 0L)

and the one that completes would presumably have to be run in an actor and
re-run if it fails

val willCompleteTheStreamTagged: Source[EventEnvelope, NotUsed] =
readJournal.currentEventsByTag("user-added", 0L)

Anyway, glad all is well from your perspective :)

On Fri, Apr 8, 2016 at 9:19 AM, Patrik Nordwall 
wrote:

>
>
> On Thu, Apr 7, 2016 at 5:17 PM, Richard Rodseth 
> wrote:
>
>> Also, seeing the work in reactive-kafka to allow the following, I wonder
>> if this is making you rethink akka-persistence-query APIs
>>
>
> Why? What is wrong? Persistence query api also has an offset parameter
> that can be used as the starting point of the stream.
>
>
>>
>> db.loadOffset().foreach { fromOffset =>
>> val settings = consumerSettings
>>   .withFromOffset(new TopicPartition("topic1", 1), fromOffset)
>> Consumer.plainSource(settings)
>>   .mapAsync(1)(db.save)
>>   }
>>
>>
>> On Thu, Apr 7, 2016 at 7:26 AM, Richard Rodseth 
>> wrote:
>>
>>> Isn't this section of the docs naïve in the face of possible errors?
>>>
>>>
>>> http://doc.akka.io/docs/akka/current/scala/persistence-query.html#Materialize_view_to_Reactive_Streams_compatible_datastore
>>>
>>> Later on that page is an incomplete example of a resumable projection.
>>> Won't something like that need to be the norm?
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 1:55 PM, Richard Rodseth 
>>> wrote:
>>>
 Hmm. I see the fromSequenceNr parameter in the query traits. So the
 read side would have to persist a watermark? Where? I was hoping for less
 boilerplate.

 On Fri, Feb 26, 2016 at 5:47 AM, Patrik Nordwall <
 patrik.nordw...@gmail.com> wrote:

>
>
> On Wed, Feb 24, 2016 at 10:33 PM, Richard Rodseth 
> wrote:
>
>> In Vaughn Vernon's Red Book (Implementing DDD) he talks about storing
>> domain events in the same transaction as the one which updates an
>> aggregate, and then out of band you read this domain event store (not in
>> the event sourcing sense)  in order to put messages on a message queue, 
>> for
>> example, to notify remote bounded contexts.
>>
>> In an akka-cassandra-kafka world, with no transactions, I imagine an
>> actor with an akka persistence query stream, that is responsible for
>> putting a domain event on a Kafka topic. In this scenario, would errors
>> writing to the Kafka topic be handled with retries and ultimately human
>> intervention? Is the way to add retry semantics to stream computation to
>> send the stream contents to an actor ref which does the writing?
>>
>
> Easiest would probably be to use akka-persistence-kafka
>  as the
> persistent actor journal, but I think you are asking for something else.
> Assume that you store the events from the persistent actor to Cassandra,
> and then have a query to read that stream and put them into Kafka. If the
> Kafka write fails you have read the events from Cassandra again from the
> last know successful offset, and try again.
>
> /Patrik
>
>
>>
>> In short: are there any good examples yet of the akka persistence
>> approach to DDD, including published domain events (which I think are
>> distinct from raw persistence events).
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives:
>> https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google
>> Groups "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it,
>> send an email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Lightbend  -  Reactive apps on the JVM
> Twitter: @patriknw
>
> [image: Lightbend] 
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives:
> https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google
> Groups "Akka User 

Re: [akka-user] Compiler saying "NotUsed" should be "BoxedUnit"

2016-04-08 Thread Konrad Malawski
Don't mix these versions.
Akka streams is part of Akka since 2.4.2, among others also for exactly
this reason.

Please use Akka streams 2.4.2 with akka 2.4.2.
Same with 2.4.3, etc. They move together now.

Happy talking!
On Apr 8, 2016 18:43, "James P"  wrote:

> Hi Folks,
>
> I'm trying out the tutorial on Akka Streams (Java) and the following line
> of code from the tutorial:
>
> final Source source =
>Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
>
>
> causes a compilation failure. It says that "NotUsed" should be a
> BoxedUnit.
>
> Not really a big problem, but from what I understand, BoxedUnit was
> replaced with NotUsed in akka 2.4.2.
>
> Anybody have an idea why this might be happening?
>
> Running: akka 2.4.3 and akka-streams 2.0.4.
> Tutorial from here:
> http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-flows-and-basics.html
>
> Any help would be much appreciated. Thanks.
>
> Best regards,
> James
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Compiler saying "NotUsed" should be "BoxedUnit"

2016-04-08 Thread James P
Thanks Konrad! 

I initially did not see the "akka-streams" on maven, I was using 
"akka-streams-experimental".

Everything is working fine now after importing akka-streams 2.4.3.

By the way, Akka streams is amazing! Makes a lot of things that I need to 
do much much simpler :)

Have a great weekend!

On Friday, April 8, 2016 at 11:25:15 AM UTC-7, Konrad Malawski wrote:
>
> Don't mix these versions.
> Akka streams is part of Akka since 2.4.2, among others also for exactly 
> this reason.
>
> Please use Akka streams 2.4.2 with akka 2.4.2. 
> Same with 2.4.3, etc. They move together now.
>
> Happy talking!
> On Apr 8, 2016 18:43, "James P" > wrote:
>
>> Hi Folks,
>>
>> I'm trying out the tutorial on Akka Streams (Java) and the following line 
>> of code from the tutorial:
>>
>> final Source source =
>>Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
>>
>>
>> causes a compilation failure. It says that "NotUsed" should be a 
>> BoxedUnit. 
>>
>> Not really a big problem, but from what I understand, BoxedUnit was 
>> replaced with NotUsed in akka 2.4.2.
>>
>> Anybody have an idea why this might be happening?
>>
>> Running: akka 2.4.3 and akka-streams 2.0.4.
>> Tutorial from here: 
>> http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-flows-and-basics.html
>>
>> Any help would be much appreciated. Thanks.
>>
>> Best regards,
>> James
>>
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.