Re: [akka-user] Akka-Streams and Android

2018-03-07 Thread Konrad “ktoso” Malawski
Sorry, it’s been years since I’ve done Android development…
Maybe someone else on the list will have insights or ideas where better to
ask

-- 
Cheers,
Konrad 'ktoso ' Malawski
Akka  @ Lightbend 

On March 8, 2018 at 1:58:29, 'Matt' via Akka User List (
akka-user@googlegroups.com) wrote:

Ah, thanks. That's good to know, too bad.

I was thinking about playing around with reactive streams and MVI
(Modell-View-Intent). I tried RxScala with RxJava2 and RxAndroid before but
I wasn't very successful. So, I thought I give akka-streams a try. Any
ideas on how to do MVI with Scala on Android?



Am Mittwoch, 7. März 2018 17:48:02 UTC+1 schrieb Konrad Malawski:
>
> Akka requires JDK8, which Android is not AFAIR… they support Java8 syntax
> nowadays but not JDK8 bytecode right?
>
> You could use ancient versions of Akka which would run on JDK6 bytecode,
> but that’s strongly discouraged.
>
> --
> Cheers,
> Konrad 'ktoso ' Malawski
> Akka  @ Lightbend 
>
> On March 8, 2018 at 1:29:05, 'Matt' via Akka User List (
> akka...@googlegroups.com) wrote:
>
> Hi there,
>
> I wonder if akka-streams are working on Android. I tried to get it working
> but there are tons of warnings and notes on the proguard step. I also did
> some proguard rules but still the estimated method count is >64K (81864);
> Akka-Streams 23684 alone. Is there a chance to make it work or do I just
> wasting my time? Does anyone tried it?
>
> Cheers,
> Matt
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+...@googlegroups.com.
> To post to this group, send email to akka...@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
> --
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka-Streams and Android

2018-03-07 Thread 'Matt' via Akka User List
Ah, thanks. That's good to know, too bad. 

I was thinking about playing around with reactive streams and MVI 
(Modell-View-Intent). I tried RxScala with RxJava2 and RxAndroid before but 
I wasn't very successful. So, I thought I give akka-streams a try. Any 
ideas on how to do MVI with Scala on Android?



Am Mittwoch, 7. März 2018 17:48:02 UTC+1 schrieb Konrad Malawski:
>
> Akka requires JDK8, which Android is not AFAIR… they support Java8 syntax 
> nowadays but not JDK8 bytecode right?
>
> You could use ancient versions of Akka which would run on JDK6 bytecode, 
> but that’s strongly discouraged.
>
> -- 
> Cheers,
> Konrad 'ktoso ' Malawski
> Akka  @ Lightbend 
>
> On March 8, 2018 at 1:29:05, 'Matt' via Akka User List (
> akka...@googlegroups.com ) wrote:
>
> Hi there,
>
> I wonder if akka-streams are working on Android. I tried to get it working 
> but there are tons of warnings and notes on the proguard step. I also did 
> some proguard rules but still the estimated method count is >64K (81864); 
> Akka-Streams 23684 alone. Is there a chance to make it work or do I just 
> wasting my time? Does anyone tried it?
>
> Cheers,
> Matt
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka-Streams and Android

2018-03-07 Thread Konrad “ktoso” Malawski
Akka requires JDK8, which Android is not AFAIR… they support Java8 syntax
nowadays but not JDK8 bytecode right?

You could use ancient versions of Akka which would run on JDK6 bytecode,
but that’s strongly discouraged.

-- 
Cheers,
Konrad 'ktoso ' Malawski
Akka  @ Lightbend 

On March 8, 2018 at 1:29:05, 'Matt' via Akka User List (
akka-user@googlegroups.com) wrote:

Hi there,

I wonder if akka-streams are working on Android. I tried to get it working
but there are tons of warnings and notes on the proguard step. I also did
some proguard rules but still the estimated method count is >64K (81864);
Akka-Streams 23684 alone. Is there a chance to make it work or do I just
wasting my time? Does anyone tried it?

Cheers,
Matt
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka-Streams and Android

2018-03-07 Thread 'Matt' via Akka User List
Hi there,

I wonder if akka-streams are working on Android. I tried to get it working 
but there are tons of warnings and notes on the proguard step. I also did 
some proguard rules but still the estimated method count is >64K (81864); 
Akka-Streams 23684 alone. Is there a chance to make it work or do I just 
wasting my time? Does anyone tried it?

Cheers,
Matt

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka Streams] Difference between KillSwitch and Materializer.shutdown()

2018-03-01 Thread dollyg
This was helpful. Thank you!

On Monday, February 19, 2018 at 6:04:45 PM UTC+5:30, Konrad Malawski wrote:
>
> First warning sign: Why would you have one stream per materializer?
>
> shutting down the materializer while things are running is very brutal. 
> It’s like forcefully pulling the carpet from under someone’s feet, 
> and the laughing as they spill their coffee upon themselves — don’t do 
> this as the go-to solution. 
> It’s better than leaving resource leaks, but don’t do this as the “clean” 
> shutdown.
>
>
> KillSwitches actually signal termination properly using streams signals — 
> cancelation and completion/error.
> Use them when you want to externally stop things.
>
> -- 
> Cheers,
> Konrad 'ktoso ' Malawski
> Akka  @ Lightbend 
>
> On February 19, 2018 at 21:31:58, dol...@thoughtworks.com  (
> dol...@thoughtworks.com ) wrote:
>
> I came across 2 ways to terminate a stream
>
>1. KillSwitch 
>2. Materializer.shutdown() 
>
> I see one difference which is 
>
>- Materializer.shutdown() is used to kill all streams materialized by 
>that materializer whereas KillSwitch can be used to terminate one 
>particular stream.
>
>
> In a scenario where I have one materializer per stream, is there any 
> difference between KillSwitch and Materializer.shutdow()? Which one should 
> be used? And When?
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka Streams] Feedback on custom throttle stage

2018-03-01 Thread dollyg


Hi all, 

We are trying to create a stage which caters to 2 requirements - 

1) If the producer is faster than the consumer, then consumer should get the 
latest element always (dropping the intermediate elements). Similar to conflate 

 api. This was discussed here 
.

2) If the producer is slower than the consumer, then consumer (who is pulling 
on a faster frequency) should get the latest element on the stream on each 
pull. (which means elements will be duplicated). Similar to expand 

 api.

Since the conflate and expand stages both buffer elements which increases the 
latency, we are creating our own stage.
 
Looking for feedback on the code below.

import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.concurrent.duration.FiniteDuration

class CustomThrottleStage[A](delay: FiniteDuration) extends 
GraphStage[FlowShape[A, A]] {
  final val in= Inlet.create[A]("Throttle.in")
  final val out   = Outlet.create[A]("Throttle.out")
  final val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new TimerGraphStageLogic(shape) {
private var isPulled = false
private var maybeElem: Option[A] = None

override def preStart(): Unit = {
  schedulePeriodically(None, delay)
  pull(in)
}

setHandler(
  in,
  new InHandler {
override def onPush(): Unit = {
  //Whenever upstream pushes elements, store it and push it only on the 
timer.
  maybeElem = Some(grab(in))
  pull(in) //drop elements - required when the producer is faster
}
  }
)

setHandler(
  out,
  new OutHandler {
override def onPull(): Unit = {
  isPulled = true
}
  }
)

override def onTimer(key: Any): Unit = {
  *//**on timer, push only if there is a demand from downstream*
  if (isPulled) {
maybeElem.foreach { x =>
  isPulled = false
  push(out, x)
}
  }
}
  }
}



Regards,

Dolly




-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka-Streams] Want to always receive latest element in Sink

2018-02-23 Thread saloniv
Hello Arnout,

We are providing api for developers to use. So, there is a callback written 
by other developers which will be executed instead of Thread.sleep in 
example. 

Hence, we cannot say whether they will write CPU intensive code or some 
kind of IO.

Hope this helps.

On Wednesday, February 7, 2018 at 6:26:11 PM UTC+5:30, Arnout Engelen wrote:
>
> Hello Saloni,
>
> I think it would be helpful to have a more realistic example than doing 
> "Thread.sleep(1000)" in the sink.
>
> Could we unpack what this sleep is intended to mimic in your 'real' 
> application? Is it for example doing CPU-intensive data parsing or perhaps 
> some kind of IO?
>
>
> Kind regards,
>
> Arnout
>
> On Thu, Jan 25, 2018 at 7:27 AM, > 
> wrote:
>
>> Hello,
>>
>> We are having a requirement that if a consumer is slower than producer 
>> then discard all the elements that cannot be consumed and whenever the 
>> consumer gets ready, feed the latest element from producer.
>>
>> We tried an approach as follows:
>>
>> Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives 
>>> data at every 10 milliseconds
>>
>> .runWith {
>>>println("data received")
>>>Thread.sleep(1000)   // mimic consumer processing data in 
>>> every 1 second
>>> }
>>
>>
>> We shrank the buffer size to 1 (minimal possible) with following settings
>>
>> private val actorMaterializerSettings = ActorMaterializerSettings(
>>> actorSystem).withInputBuffer(1, 1)
>>
>>
>> With this buffer size, Sink pulls data 1 to consume and data 2 to put in 
>> buffer at initialization.
>>
>> While data 1 is getting processed we are dropping data from producer.
>>
>> When data 1 gets processed after 1000 milliseconds (1 second) ideally I 
>> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I 
>> receive data 2 from the buffer. data 2 in our domain is extremely useless 
>> as it is stale.
>>
>> Is there a way to disable buffer at Sink totally and always pull latest 
>> data from Source ?
>>
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Arnout Engelen
> *Senior Software Engineer*
> E: arnout@lightbend.com 
> T: https://twitter.com/raboofje
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka Streams] Difference between KillSwitch and Materializer.shutdown()

2018-02-19 Thread Konrad “ktoso” Malawski
First warning sign: Why would you have one stream per materializer?

shutting down the materializer while things are running is very brutal.
It’s like forcefully pulling the carpet from under someone’s feet,
and the laughing as they spill their coffee upon themselves — don’t do this
as the go-to solution.
It’s better than leaving resource leaks, but don’t do this as the “clean”
shutdown.


KillSwitches actually signal termination properly using streams signals —
cancelation and completion/error.
Use them when you want to externally stop things.

-- 
Cheers,
Konrad 'ktoso ' Malawski
Akka  @ Lightbend 

On February 19, 2018 at 21:31:58, dol...@thoughtworks.com (
dol...@thoughtworks.com) wrote:

I came across 2 ways to terminate a stream

   1. KillSwitch
   2. Materializer.shutdown()

I see one difference which is

   - Materializer.shutdown() is used to kill all streams materialized by
   that materializer whereas KillSwitch can be used to terminate one
   particular stream.


In a scenario where I have one materializer per stream, is there any
difference between KillSwitch and Materializer.shutdow()? Which one should
be used? And When?
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka Streams] Difference between KillSwitch and Materializer.shutdown()

2018-02-19 Thread dollyg
I came across 2 ways to terminate a stream

   1. KillSwitch
   2. Materializer.shutdown()

I see one difference which is 

   - Materializer.shutdown() is used to kill all streams materialized by 
   that materializer whereas KillSwitch can be used to terminate one 
   particular stream.
   

In a scenario where I have one materializer per stream, is there any 
difference between KillSwitch and Materializer.shutdow()? Which one should 
be used? And When?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka-Streams] Want to always receive latest element in Sink

2018-02-08 Thread Johan Andrén
You should be able to allow a faster upstream to continue, while emitting 
the latest value whenever downstream is read with conflate like so:

Source(0 to 1000) 
  .throttle(10, 1.second, 1, ThrottleMode.shaping) // fast upstream
  .conflate((in, prev) => in) // keep the latest value
  .throttle(2, 1.second, 1, ThrottleMode.shaping) // slow downstream 
  .runForeach(println)


--

Johan

Akka Team


On Wednesday, February 7, 2018 at 1:56:11 PM UTC+1, Arnout Engelen wrote:
>
> Hello Saloni,
>
> I think it would be helpful to have a more realistic example than doing 
> "Thread.sleep(1000)" in the sink.
>
> Could we unpack what this sleep is intended to mimic in your 'real' 
> application? Is it for example doing CPU-intensive data parsing or perhaps 
> some kind of IO?
>
>
> Kind regards,
>
> Arnout
>
> On Thu, Jan 25, 2018 at 7:27 AM,  wrote:
>
>> Hello,
>>
>> We are having a requirement that if a consumer is slower than producer 
>> then discard all the elements that cannot be consumed and whenever the 
>> consumer gets ready, feed the latest element from producer.
>>
>> We tried an approach as follows:
>>
>> Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives 
>>> data at every 10 milliseconds
>>
>> .runWith {
>>>println("data received")
>>>Thread.sleep(1000)   // mimic consumer processing data in 
>>> every 1 second
>>> }
>>
>>
>> We shrank the buffer size to 1 (minimal possible) with following settings
>>
>> private val actorMaterializerSettings = ActorMaterializerSettings(
>>> actorSystem).withInputBuffer(1, 1)
>>
>>
>> With this buffer size, Sink pulls data 1 to consume and data 2 to put in 
>> buffer at initialization.
>>
>> While data 1 is getting processed we are dropping data from producer.
>>
>> When data 1 gets processed after 1000 milliseconds (1 second) ideally I 
>> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I 
>> receive data 2 from the buffer. data 2 in our domain is extremely useless 
>> as it is stale.
>>
>> Is there a way to disable buffer at Sink totally and always pull latest 
>> data from Source ?
>>
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Arnout Engelen
> *Senior Software Engineer*
> E: arnout.enge...@lightbend.com
> T: https://twitter.com/raboofje
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka-Streams] Want to always receive latest element in Sink

2018-02-07 Thread Arnout Engelen
Hello Saloni,

I think it would be helpful to have a more realistic example than doing
"Thread.sleep(1000)" in the sink.

Could we unpack what this sleep is intended to mimic in your 'real'
application? Is it for example doing CPU-intensive data parsing or perhaps
some kind of IO?


Kind regards,

Arnout

On Thu, Jan 25, 2018 at 7:27 AM,  wrote:

> Hello,
>
> We are having a requirement that if a consumer is slower than producer
> then discard all the elements that cannot be consumed and whenever the
> consumer gets ready, feed the latest element from producer.
>
> We tried an approach as follows:
>
> Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives data
>> at every 10 milliseconds
>
> .runWith {
>>println("data received")
>>Thread.sleep(1000)   // mimic consumer processing data in
>> every 1 second
>> }
>
>
> We shrank the buffer size to 1 (minimal possible) with following settings
>
> private val actorMaterializerSettings = ActorMaterializerSettings(acto
>> rSystem).withInputBuffer(1, 1)
>
>
> With this buffer size, Sink pulls data 1 to consume and data 2 to put in
> buffer at initialization.
>
> While data 1 is getting processed we are dropping data from producer.
>
> When data 1 gets processed after 1000 milliseconds (1 second) ideally I
> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I
> receive data 2 from the buffer. data 2 in our domain is extremely useless
> as it is stale.
>
> Is there a way to disable buffer at Sink totally and always pull latest
> data from Source ?
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Arnout Engelen
*Senior Software Engineer*
E: arnout.enge...@lightbend.com
T: https://twitter.com/raboofje

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] How to test FanOut shapes?

2018-02-04 Thread Sergii Sopin
Hi,

I am writing tests for my application and I want to test function that 
returns Partition shape with two outlets. Are there any best practices on 
how to test Partition shapes? 
Thanks.

Regards,
Sergii

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka-Streams-Kafka] Filter a committable stream

2018-01-31 Thread Nicolae Namolovan
Hello there,

Was wondering if anybody has a suggestion how to filter messages from a 
committable kafka source while not loosing the committable offsets that 
needs to be committed after it was sent to a sink.

Currently I'm reading from Kafka, filter and write the result to another 
kafka topic using batches.

A simplified version of my code:

val done =
  Consumer.committableSource(consumerSettings, 
Subscriptions.topics("topic1"))
.filter(_.record.value == "something")
.via(Producer.flow(producerSettings))
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) 
{ (batch, elem) =>
  batch.updated(elem)
}
.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)

When I filter elements from the stream, I'm loosing the committable 
information.

Anybody has a suggestion how to avoid that and still commit the offsets of 
the filtered in the batches after it was written  ?

Kind Regards,
Nicolae N.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka-streams: Increase processing rate:

2018-01-29 Thread Akka Team
It could be a good idea to take stages out, measure the throughput while
adding a stage at a time to see what actually is limiting your throughput.

If it in fact is Akka HTTP, then the default configuration for the
connection pool only allows 4 connections per host, maybe increasing this
(akka.http.host-connection-pool.max-connections) would help. It could
actually be easier to reason about the flow by using singleRequest inside a
mapAsync or a mapAsyncUnordered with a parallelism matching the
max-connection per host setting. It could also make sense to insert async
boundaries, to let the logic before the request execute in parallel with
the request being executed.

--
Johan
Akka Team

On Tue, Jan 9, 2018 at 4:23 AM, kor  wrote:

> Hello all. I have a requirement where I need to stream records from a
> couchbase database, do some json transformations and post the result to a
> rest endpoint (implemented with Akka-Http). I've tested that endpoint in
> isolation using gatling, and it can handle at least 200 requests/sec. The
> problem I am having when I run it, I cannot get the speed to go past 10
> requests/sec. I've tested the speed that records are being pulled out of
> couchbase and got a rate of about 2700 records/sec so something is causing
> backpressure to be applied to the source. I've also tried posting to a
> different public endpoint (probably not implemented with akka-http) and the
> speed increases to about 80 requests/sec. I've posted the code below. How
> can I make things go faster? Disclaimer: I am new to akka-streams.
>
> code: https://gist.github.com/khalidr/1a392680c0ed744b2d3e5cec1e1f193a
>
> Any help would be appreciated.
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka-Streams] Want to always receive latest element in Sink

2018-01-24 Thread saloniv
Hello,

We are having a requirement that if a consumer is slower than producer then 
discard all the elements that cannot be consumed and whenever the consumer 
gets ready, feed the latest element from producer.

We tried an approach as follows:

Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives data 
> at every 10 milliseconds

.runWith {
>println("data received")
>Thread.sleep(1000)   // mimic consumer processing data in 
> every 1 second
> }


We shrank the buffer size to 1 (minimal possible) with following settings

private val actorMaterializerSettings = ActorMaterializerSettings(
> actorSystem).withInputBuffer(1, 1)


With this buffer size, Sink pulls data 1 to consume and data 2 to put in 
buffer at initialization.

While data 1 is getting processed we are dropping data from producer.

When data 1 gets processed after 1000 milliseconds (1 second) ideally I 
should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I 
receive data 2 from the buffer. data 2 in our domain is extremely useless 
as it is stale.

Is there a way to disable buffer at Sink totally and always pull latest 
data from Source ?


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams 2.5 sudden throughput drop

2018-01-15 Thread Dmitry Golubets
Hi,

We have a custom data processing pipeline in the company built on Akka 
Streams, that has been happily working for a year on 2.4 version.
Recently I've started migrating to 2.5 and it went very smooth, but.. There 
is a huge throughput drop at some random time point.

So, we measure it in messages per second. 
With 2.4 we always had 21-22k with some small fluctuations.
With 2.5 it starts well, even faster, but then at some point (6 hours or 24 
hours, kinda random) it drops to ~15k, which is quite significant 25% drop.

User code is the same between versions.
The only thing that had to be added is .async boundaries in many places - 
cos in 2.4 we had fusing disabled.
I tried both default ForkJoinPool and new AffinityPool and both have the 
same issues. 

Profiling with JvisualVM pretty much gives me nothing. 
I compared snapshots between application running fast and slow. The only 
difference I see is CPU load ( faster app has higher load).
Memory consumption is the same. 
Sampling shows nothing suspicious.
Threads usage looks the same.

So, the only assumption I could make is that it is has to do something with 
internal akka dispatching\scheduling of tasks or some deeper internal 
change.

So far I failed to reproduce it in some simple isolated example.
I can't even reliably do it in the application itself.
However I noticed that sometimes I can trigger it by forcing full GC, but 
again, not every time, still random..

I hope that either someone has experienced similar issues or has more idea 
of what has changed internally in 2.5 that could lead to this problem?
I will appreciate any ideas! :)

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka-streams: Increase processing rate:

2018-01-08 Thread kor
Hello all. I have a requirement where I need to stream records from a 
couchbase database, do some json transformations and post the result to a 
rest endpoint (implemented with Akka-Http). I've tested that endpoint in 
isolation using gatling, and it can handle at least 200 requests/sec. The 
problem I am having when I run it, I cannot get the speed to go past 10 
requests/sec. I've tested the speed that records are being pulled out of 
couchbase and got a rate of about 2700 records/sec so something is causing 
backpressure to be applied to the source. I've also tried posting to a 
different public endpoint (probably not implemented with akka-http) and the 
speed increases to about 80 requests/sec. I've posted the code below. How 
can I make things go faster? Disclaimer: I am new to akka-streams.

code: https://gist.github.com/khalidr/1a392680c0ed744b2d3e5cec1e1f193a

Any help would be appreciated.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka-Streams] MergePrioritized vs MergePreferred

2017-12-28 Thread Sergii Sopin
Hi all,

I have certain difficulties with merging different branches of flow. 
My flow looks like following:

  
|-|
  |  |-|
|
  |  |--> ||   |
|
  |   ||   |
|
  |-- ||   |
|
  ||   |
|
  |--->>> | merge1 |   |
|
  |   ||   |
|
  |  |->> ||  || --|  
|-| --|
  |  |||  |   doByI1   |  
|  doByI2 |
~~+--+--> || --> |--| --> || ---> 
|-| ->>> ||
  |  |   | splitter | 
---| ||
  |  |   |--| --> || > 
|-| --+-->> | merge2 | -->
  |  ||   doByI3   |  
 |doByI4   |   | ||
  |  ||| --|  
 |-| --|   |---> ||
  |  |-|
 |
  
|--|


Both merge shapes have type MergePrioritized with eagerComplete = true. 
The problem is when I run flow with some input which is correct, I 
sometimes receive following error: 

[error] a.a.RepointableActorRef - Error in stage [MergePrioritized]: Cannot 
get element from already empty input port (MergePrioritized.in1(1037027355))
java.lang.IllegalArgumentException: Cannot get element from already empty 
input port (MergePrioritized.in1(1037027355))

If I change eagerComplete value for of merge shapes I still see an error + 
flow never ends. If I change it for both shapes, then error disappears, but 
flow never ends. 
I am wondering if it is design problem or I should look closer to the code 
itself? 

PS: If I replace MergePrioritized with MergePreferred, everything works 
correctly without any problems. 

Thank you in advance!

Regards,
Sergii




-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka-streams] MergePrioritized with Java

2017-12-28 Thread Sergii Sopin
Hi all,

I am trying to implement flow with MergePrioritized shape using Java. 
However, there are no examples on how to connect ports. 
I tried something like this: 

int[] priorities = {1, 2, 3};
UniformFanInShape inputMergePrioritized = MergePrioritized
.create(MyClass.class, priorities, true).shape();
builder.from(myShape1.out0()).toInlet(inputMergePrioritized.in(0));
builder.from(myShape2.out0()).toInlet(inputMergePrioritized.in(1));
builder.from(myShape3.out0()).toInlet(inputMergePrioritized.in(2));

builder.from(inputMergePrioritized.out()).toInlet(myShape4.in());

Here is the result:
[error] a.a.OneForOneStrategy - [MergePrioritized.in0] is already connected

I checked and realized that it was not previously connected. 

Could you please help me to figure out how to deal with it? 
Thanks.

Regards,
Sergii

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams for synchronous request processing

2017-12-20 Thread Konrad “ktoso” Malawski
In short I want to execute Akka graph from my API with out materializing
every time.

What’s your reason for this? Just so we’re on the same page and not
imagining reasons for doing this :)

Problem that I am facing:
How to pass request object to source node of graph.

That’s as simple as using a Source that allows inserting elements from
outside the stream, like Source.queue or Source.actorRef

Read the docs about those.


How to get corresponding response from sink node.

Why “corresponding”? That sounds like you’re not stream processing really
but request response thingy.

You can pass around the “which request this originated from" to correlate
once the sink gets it.


— Konrad

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams for synchronous request processing

2017-12-20 Thread Harit Chandra
In my use case:
I have multiple Akka graph with source,sink and flow. In my API on basis of
some request parameter I have to execute one of the graph. In source node
of graph I have to pass request which is received in API. I do not want to
materialize my graph on each request. I have to send the result of sink
node as response of my API.

In short I want to execute Akka graph from my API with out materializing
every time.

Problem that I am facing:
How to pass request object to source node of graph.
How to get corresponding response from sink node.

Thanks,
Harit

Regards,
Harit

On Wed, Dec 20, 2017 at 9:11 PM, Konrad “ktoso” Malawski <
konrad.malaw...@lightbend.com> wrote:

> Sure it’d just work… What specifically do you have in mind?
> What do you mean by “We should be able to plug-in out partial graphs to
> the flow.” (specifically, what’s the use case).
>
> --
> Cheers,
> Konrad 'ktoso ' Malawski
> Akka  @ Lightbend 
>
> On December 20, 2017 at 15:12:30, Harit Chandra (chandra.ha...@gmail.com)
> wrote:
>
> Hi All,
>
> We are evaluating Akka as simple graph executor. For our use case we need
> to create REST API's and we wish to use Akka streams at core to process the
> requests. We cannot use Akka HTTP due to some constraints. The requirement
> is to materialize the graph once and synchronously process REST requests.
> Please provide some pointers/snippets for this use case. We should be able
> to plug-in out partial graphs to the flow.
>
> Thanks,
> Harit
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams for synchronous request processing

2017-12-20 Thread Konrad “ktoso” Malawski
Sure it’d just work… What specifically do you have in mind?
What do you mean by “We should be able to plug-in out partial graphs to the
flow.” (specifically, what’s the use case).

-- 
Cheers,
Konrad 'ktoso ' Malawski
Akka  @ Lightbend 

On December 20, 2017 at 15:12:30, Harit Chandra (chandra.ha...@gmail.com)
wrote:

Hi All,

We are evaluating Akka as simple graph executor. For our use case we need
to create REST API's and we wish to use Akka streams at core to process the
requests. We cannot use Akka HTTP due to some constraints. The requirement
is to materialize the graph once and synchronously process REST requests.
Please provide some pointers/snippets for this use case. We should be able
to plug-in out partial graphs to the flow.

Thanks,
Harit
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams for synchronous request processing

2017-12-20 Thread Harit Chandra
Hi All,

We are evaluating Akka as simple graph executor. For our use case we need 
to create REST API's and we wish to use Akka streams at core to process the 
requests. We cannot use Akka HTTP due to some constraints. The requirement 
is to materialize the graph once and synchronously process REST requests. 
Please provide some pointers/snippets for this use case. We should be able 
to plug-in out partial graphs to the flow.

Thanks,
Harit

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Konrad “ktoso” Malawski
So rather unlikely people will want access to your materialiser there.
The sharing of materializer is most important for lifecycle - killing a
materializer kills all streams it hosts,
so in your case it’s unlikely you want to bind your streams to random other
people’s streams lifecycles.

I suggest you have a separate separate materializer in the extension.

—
Konrad `kto.so` Malawski
Akka  @ Lightbend 

On 30 September 2017 at 11:29:15, Jeff (jknight12...@gmail.com) wrote:

A Consul like service discovery client built on top of akka-http

-- 
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
A Consul like service discovery client built on top of akka-http

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Konrad “ktoso” Malawski
Again: what are you building? ;-)
It’s very hard to provide answers if you don’t provide context.

—
Konrad `kto.so` Malawski
Akka  @ Lightbend 

On 30 September 2017 at 11:23:19, Jeff (jknight12...@gmail.com) wrote:

That's what I was leaning towards as well, since materialzers are fairly
lightweight. However, I've seen that statement prefaced with, "...right
now" so I wanted to make sure there wasn't some other way to share.

-- 
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
That's what I was leaning towards as well, since materialzers are fairly 
lightweight. However, I've seen that statement prefaced with, "...right now" so 
I wanted to make sure there wasn't some other way to share. 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Konrad Malawski
Though I'm not sure it actually is as important to have the users and your
extension share a materializer by the way.
What is it you're building exactly?

Keeping one materializer in the extension could be totally fine as well.

On Sat, Sep 30, 2017 at 11:18 AM, Konrad “ktoso” Malawski <
konrad.malaw...@lightbend.com> wrote:

> When they use it;
>
> def use(...)(implicit m: Materializer)
>
> —
> Konrad `kto.so` Malawski
> Akka  @ Lightbend 
>
> On 30 September 2017 at 11:18:16, Jeff (jknight12...@gmail.com) wrote:
>
> How, exactly. You only have access to an ExtendedActorSystem inside of
> createExtension()
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>


-- 
Cheers,
Konrad 'ktoso' Malawski
Akka  @ Lightbend 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Konrad “ktoso” Malawski
When they use it;

def use(...)(implicit m: Materializer)

—
Konrad `kto.so` Malawski
Akka  @ Lightbend 

On 30 September 2017 at 11:18:16, Jeff (jknight12...@gmail.com) wrote:

How, exactly. You only have access to an ExtendedActorSystem inside of
createExtension()

-- 
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
How, exactly. You only have access to an ExtendedActorSystem inside of 
createExtension()

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Konrad “ktoso” Malawski
Have users pass it in ;)

—
Konrad `kto.so` Malawski
Akka  @ Lightbend 

On 30 September 2017 at 10:04:48, Jeff (jknight12...@gmail.com) wrote:

I am building an akka extension, but there doesn't seem to be an elegant
way to pass in a shared Materializer. What would be the suggested way to
use a shared materializer between akka extensions and the main application?
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
I am building an akka extension, but there doesn't seem to be an elegant 
way to pass in a shared Materializer. What would be the suggested way to 
use a shared materializer between akka extensions and the main application?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka streams question

2017-09-25 Thread David Cromberge
I have a Kafka topic where messages have an account field, as well as a payload 
of decimal values. I would like to save the decimal values to an s3 file for 
each account. I would like to preserve at least once message semantics and only 
commit the offset once I'm sure the contents of the file were accepted by the 
s3 file sink.

I would like to know if akka streams are suitable for such a use case. 
Firstly, if I were to use the Alpakka s3client connector, I would need a way to 
dynamically create this sink when encountering a message with an account that 
has not been seen before. 
Secondly, when using a Kafka connector like Reactive Kafka I would need to pass 
through the commit offset and somehow commit after passing through the sink.

I have tried to create a custom graph sink with a concurrent map of sinks, but 
this did not work out very well. I have also looked at groupby but have not 
figured out how to feed each subflow to an s3 bucket sink based on its 
discriminator account.

I would appreciate any advice on how to progress, I am a relative newcomer to 
akka streams and am tempted to fallback to an actor based solution, albeit with 
lack of backpressure etc.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka streams "Working with streaming IO" example: Problem when no initial message from server

2017-09-16 Thread Allan Brighton
In the Akka docs 
, in the 
section titled "Working with streaming IO", there is an example TCP server:

connections.runForeach { connection =>


  // server logic, parses incoming commands
  val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")


  import connection._
  val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
  val welcome = Source.single(welcomeMsg)


  val serverLogic = Flow[ByteString]
.via(Framing.delimiter(
  ByteString("\n"),
  maximumFrameLength = 256,
  allowTruncation = true))
.map(_.utf8String)
.via(commandParser)
// merge in the initial banner after parser
.merge(welcome)
.map(_ + "\n")
.map(ByteString(_))


  connection.handleWith(serverLogic)
}


and REPL client:

val connection = Tcp().outgoingConnection("127.0.0.1", )


val replParser =
  Flow[String].takeWhile(_ != "q")
.concat(Source.single("BYE"))
.map(elem => ByteString(s"$elem\n"))


val repl = Flow[ByteString]
  .via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
  .map(_.utf8String)
  .map(text => println("Server: " + text))
  .map(_ => readLine("> "))
  .via(replParser)


connection.join(repl).run()


In my case, the server does not send the welcome message at the start, 
which is needed to get the flow started.
I thought I could work around this in the client by inserting this as the 
first line in the flow:

.merge(Source.single(ByteString("Welcome\r\n:")))

This sort of worked, but now it is not possible to have two clients running 
at once, unless they each send a message to the server right away on 
starting.
(Otherwise I get the error: 

[INFO] [09/17/2017 00:25:07.490] [default-akka.actor.default-dispatcher-2] [
akka://default/system/IO-TCP/selectors/$a/0] Message [akka.io.Tcp$Register] 
from Actor[akka://default/user/StreamSupervisor-0/$$a#-1654072721] to 
Actor[akka://default/system/IO-TCP/selectors/$a/0#1152954885] was not 
delivered. [1] dead letters encountered. This logging can be turned off or 
adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/17/2017 00:25:07.493] [default-akka.actor.default-dispatcher-2] [
akka://default/system/IO-TCP/selectors/$a/0] Message [akka.io.Tcp$Write] 
from Actor[akka://default/user/StreamSupervisor-0/$$a#-1654072721] to 
Actor[akka://default/system/IO-TCP/selectors/$a/0#1152954885] was not 
delivered. [2] dead letters encountered. This logging can be turned off or 
adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.


I assume this is because there was no initial message from the server to 
the client, which causes the connection to be initialized.

What is the correct way to handle this situation? 

I also noticed that when you type 'q' in the client REPL, the process does 
not exit. What is missing there?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams: Reusable graphs and dependency injection

2017-09-01 Thread Bwmat
I'm talking about DI in the constructor parameter sense here.

I've gone through some of the documentation, but one thing that's not clear to 
me is the expected model of how to efficiently execute many instances of 
'similar' graphs.

To elaborate on what I mean, I'm working on a PoC where I'm creating a main 
flow graph to manage data fetching, with the 'shape' of the graphs being mainly 
the same, except for an inlet which can be either capped with an empty source, 
or with a source which is essentially a recursive copy of the main one (with 
potentially many levels).

While the shape is regular, many nodes are parameterized with various 
interfaces which control how to generate data requests, how to interpret 
responses, how to handle errors, etc.

Right now my experimental code is just creating the graphs and passing the 
interface instances into their constructors, so each graph is essentially 
single use.

This feels possibly inefficient, to create the entire graph, bottom up, for 
each query. Code-wise I can refactor everything into nice functions, but maybe 
I'd be making the framework waste time re-optimizing identical graphs 
repeatedly. I might be overthinking, but I'm curious about any suggestions on 
how to organize things here.

Cheers,
Matthew Wozniczka

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams: Converting a Source> into a Source>, where the constructing the 'S' requires 'peeking' at a variably-sized prefix of the original Source

2017-08-31 Thread Bwmat
I'm just getting started with Akka & Akka Streams, and trying to create a 
little PoC for my project.

I want to create a flow graph like

https://drive.google.com/file/d/0B8Cu6-NkpXpCX1BuUEVtY3VPVzg/view?usp=sharing

I'm not sure how to implement the 'Data Extractor' stage. My problem is 
that I want start consuming the httpresponse and start generating events 
from it, but 'buffer' the events until I can tell whether the request was 
successful or not (by looking at the buffered events), and if there was an 
authentication error (or rate-limit error, not show in the diagram), return 
an object signifying that, otherwise returning a source that returns the 
buffered events, followed by any subsequent events, not yet buffered (so, I 
don't want to consume the entire response before returning the source).

Is there anything built-in that would help me achieve this, or do I need to 
create a custom graphstage + custom source implementation?

Thanks,
Matthew Wozniczka

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] How to guarantee parallel execution of Sunflower?

2017-08-13 Thread Patrik Nordwall
MergeHub, BroadcastHub and PartitionHub always add an asynchronous boundary
so you shouldn't use .async with them.

/Patrik

On Sun, Aug 13, 2017 at 8:35 PM, Alexey Shuksto  wrote:

> Thanks, Patrick!
>
> One more question regarding'.async' -- is there any point in adding it to
> Merge/BroadcastHub stages or their materialized sinks/sources?
>
> --
> >>  Read the docs: http://akka.io/docs/
> >>  Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>  Search the archives: https://groups.google.com/
> group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] How to guarantee parallel execution of Sunflower?

2017-08-13 Thread Alexey Shuksto
Thanks, Patrick!

One more question regarding'.async' -- is there any point in adding it to 
Merge/BroadcastHub stages or their materialized sinks/sources?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] How to guarantee parallel execution of Sunflower?

2017-08-13 Thread Patrik Nordwall
It should work with .async after groupBy. It can still be same thread. To
see it (different threads) you might have to add a sleep in the map.

/Patrik
sön 13 aug. 2017 kl. 12:35 skrev Alexey Shuksto :

> Hello there,
>
> Is there a way to guarantee that subflows after '.groupBy(..)' operation
> would be performed in parallel (I.e. by separate actors) until merged back
> into Flow?
>
> For example, in code like 'Flow[Int].groupBy(3,_ %
> 3).map(_.toString).mergeSubstreams()' where should I put something like
> async barrier?
>
> I've tried it after groupBy and after map, but according to logs it is
> still executed in the same dispatcher thread.
>
> Scala 2.12.3, Akka 2.5.3.
>
> --
> >>  Read the docs: http://akka.io/docs/
> >>  Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>  Search the archives:
> https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] How to guarantee parallel execution of Sunflower?

2017-08-13 Thread Alexey Shuksto
Hello there,

Is there a way to guarantee that subflows after '.groupBy(..)' operation would 
be performed in parallel (I.e. by separate actors) until merged back into Flow?

For example, in code like 'Flow[Int].groupBy(3,_ % 
3).map(_.toString).mergeSubstreams()' where should I put something like async 
barrier?

I've tried it after groupBy and after map, but according to logs it is still 
executed in the same dispatcher thread. 

Scala 2.12.3, Akka 2.5.3.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams Remote Materialization

2017-07-17 Thread James Roper
On Friday, July 14, 2017 at 11:55:22 AM UTC+10, James Roper wrote:
>
> We need some form a framing over the TCP connection. A lightweight binary 
> protocol could do it, but an easier and probably good enough solution to 
> start with is WebSockets with akka-http (to my knowledge akka-http doesn't 
> yet provide client side WebSocket support, but it will some day).
>

Sorry, it's been pointed out to me that Akka does support WebSockets on the 
client side (and has for a long time) - in that case I think there's no 
reason not to use akka-http's WebSocket support to implement this.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams Remote Materialization

2017-07-13 Thread James Roper
I may have some use cases for this, so I've been thinking about what it 
could look like.  Here are my rough thoughts:

The problem with using Akka remoting is that it's not a guaranteed 
transport nor does it provide back pressure - you would have to serialize 
the requset packets, and then implement some form of dropped message 
detection, then implement a replay mechanism, etc - basically, you'd have 
to implement TCP.  That doesn't make sense to do, when you already have 
TCP. So, each remote materialisation should be a TCP connection.

We need some form a framing over the TCP connection. A lightweight binary 
protocol could do it, but an easier and probably good enough solution to 
start with is WebSockets with akka-http (to my knowledge akka-http doesn't 
yet provide client side WebSocket support, but it will some day).

So, to support remote materialization, we supply an Akka extension.  This 
extension opens an akka io or akka-http port that accepts materialization 
requests.  It provides a method of converting a graph of any shape into a 
GraphRef, the GraphRef will include the shape of the graph, the address to 
connect to, and a unique identifier for the graph.  This GraphRef can then 
be sent to other hosts.  Of course, this implementation will be coupled 
with an Akka remoting serializer that will serialize graphs of all shapes 
into GraphRefs, and then into a binary format that can be sent across the 
wire.  The manifest will probably describe whether it's a javadsl or 
scaladsl graph so it knows what type to instantiate on the other end.

The extension will need to track the graphs it creates GraphRefs for, which 
is a potential resource leak, so there will need to be an expiration 
strategy. I'd envision a configurable default, which can be overridden per 
stream with graph attributes.  Available strategies could be expire after a 
given deadline, or expire after first use (probably with a deadline for the 
first use too).  If the same graph was turned to a GraphRef twice, the 
extension would detect this and emit the same identifier.

To then materialize remotely, the extension will offer a method to turn the 
GraphRef back into a graph.  It wouldn't necessarily have to be the same 
*dsl as it came from, it would just have to be the same shape, though the 
Akka serializer would always produce the same *dsl.  The recreated remote 
graph would have attributes set on it to identify it as a remote stream, 
these would be used by the extension if the remote graph were turned back 
into a GraphRef, in that case the original GraphRef would be returned, so 
any connections to it would go back to the original node, and not need to 
be proxied by the current node.  It would also be able to tell if a 
GraphRef was for itself, then it would just return the stored graph 
directly.

When materialized, a new TCP or WebSocket is created to the node where the 
actual graph lives.  If using WebSockets, the graph ID will just be passed 
as part of the URI.  Other information might be passed including the 
expected graph shape etc, which can be validated before the two ends try to 
do anything that doesn't make sense.  An additional lightweight protocol on 
top of the frames is needed - Akka serialization can be used to 
encode/decode messages, but this requires the manifest and serializer id to 
also be sent.  Additionally, for correct error propogation, there needs to 
be some form of typing on frames, so that when the stream terminates with 
an error, that error can be serialized and sent down the stream.

When working with sinks, sources and flows, it's quite simple, just use the 
corresponding up/down channel on the socket, and upstream/downstream 
completion at different times on flows can be handled by half closing 
connections.  But if we want to allow bidi flows, then either we need two 
sockets (which will require some additional request identifier for the 
server to associate them to the same materialization) or the multiplexing 
of channels on a single socket.  Multiplexing of channels on a single 
socket will mean the channels both share the same backpressure, which could 
be a problem.  Another solution to this would be to use http2, which does 
support multiplexing with independent back pressure.  If we do go down the 
http2 path, then we open up the option of multiplexing multiple graph 
materializations down the one TCP connection.

Materialized values are an interesting thing to contend with. I see two 
approaches, one is to always materialize to NotUsed, and only allow streams 
that materialize to NotUsed to be converted to GraphRefs.  Of course, this 
can only be enforced at compile time, which means the Akka remoting 
serializer for this can at best warn/fail at runtime after the stream is 
materialized if it returns anything other than NotUsed.

The other option, which has similar constraints, is to always materialize 
to a Future/CompletionStage (depending on which dsl) of the remote 
m

Re: [akka-user] Akka Streams - UDP messages

2017-07-13 Thread Arnout Engelen
Hi Rajesh,

Where exactly did you add that log message? Perhaps there is something we
can tweak/optimize to drop less UDP data in your case, but in general (as
Konrad mentioned above) the network is also allowed to drop UDP packets, so
if you need reliable transmission UDP is not going to take care of it.


Kind regards,

Arnout

On Thu, Jul 13, 2017 at 3:09 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi Konrad,
>
> Thank you for the response. In the server side, I have added some debug
> messages. Below is the log message.
>
> Datagram buffer size ({}) exceeded.
>
> Whenever I see this messages, that time messages are dropped.
>
> Regards,
> Rajesh
>
> On Thu, Jul 13, 2017 at 6:33 PM, Konrad “ktoso” Malawski <
> konrad.malaw...@lightbend.com> wrote:
>
>> Hi there,
>> you do realize that UDP, the protocol itself, does not guarantee delivery
>> of anything?
>> It absolutely may and will in practice drop packets - it is designed to
>> do exactly that.
>>
>> Start reading about UDP here https://en.wikipedia.org/
>> wiki/User_Datagram_Protocol and take it from there to networking books
>> and articles.
>>
>> In short, if you’re suprised that UDP “dropped”, you should likely not be
>> using UDP at all :-)
>>
>> — Konrad
>>
>>
>> On 13 July 2017 at 21:34:17, Madabhattula Rajesh Kumar (
>> mrajaf...@gmail.com) wrote:
>>
>> Hi,
>>
>> I am using Akka Streams to read the messages from UDP port and write into
>> the filesystem. It is not able to read all messages. Some messages are
>> dropping.
>>
>> I have found one example program in the github.
>>
>> https://github.com/jpthomasset/akka-udp-stream
>>
>> val source = UdpSource(new InetSocketAddress("127.0.0.1", 9876), 100)
>> val result = source.map(x => x.data.decodeString("UTF-8")).
>> runWith(lineSink(filePath))
>> import system.dispatcher
>> result.onComplete { _ => system.terminate() }
>>
>> def lineSink(filename: String): Sink[String, Future[IOResult]] = {
>> Flow[String]
>>   .map(s => ByteString(s + "\n"))
>>   .toMat(FileIO.toFile(new File(filename)))(Keep.right)
>>   }
>>
>> *Do we need to increase the "maxBufferSize" to receive all messages from
>> UDP?*
>>
>> Please let me know how to receive all messages?
>>
>> Regards,
>> Rajesh
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Arnout Engelen
*Senior Software Engineer*
E: arnout.enge...@lightbend.com
T: https://twitter.com/raboofje

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams - UDP messages

2017-07-13 Thread Madabhattula Rajesh Kumar
Hi Konrad,

Thank you for the response. In the server side, I have added some debug
messages. Below is the log message.

Datagram buffer size ({}) exceeded.

Whenever I see this messages, that time messages are dropped.

Regards,
Rajesh

On Thu, Jul 13, 2017 at 6:33 PM, Konrad “ktoso” Malawski <
konrad.malaw...@lightbend.com> wrote:

> Hi there,
> you do realize that UDP, the protocol itself, does not guarantee delivery
> of anything?
> It absolutely may and will in practice drop packets - it is designed to do
> exactly that.
>
> Start reading about UDP here https://en.wikipedia.org/
> wiki/User_Datagram_Protocol and take it from there to networking books
> and articles.
>
> In short, if you’re suprised that UDP “dropped”, you should likely not be
> using UDP at all :-)
>
> — Konrad
>
>
> On 13 July 2017 at 21:34:17, Madabhattula Rajesh Kumar (
> mrajaf...@gmail.com) wrote:
>
> Hi,
>
> I am using Akka Streams to read the messages from UDP port and write into
> the filesystem. It is not able to read all messages. Some messages are
> dropping.
>
> I have found one example program in the github.
>
> https://github.com/jpthomasset/akka-udp-stream
>
> val source = UdpSource(new InetSocketAddress("127.0.0.1", 9876), 100)
> val result = source.map(x => x.data.decodeString("UTF-8")).
> runWith(lineSink(filePath))
> import system.dispatcher
> result.onComplete { _ => system.terminate() }
>
> def lineSink(filename: String): Sink[String, Future[IOResult]] = {
> Flow[String]
>   .map(s => ByteString(s + "\n"))
>   .toMat(FileIO.toFile(new File(filename)))(Keep.right)
>   }
>
> *Do we need to increase the "maxBufferSize" to receive all messages from
> UDP?*
>
> Please let me know how to receive all messages?
>
> Regards,
> Rajesh
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams - UDP messages

2017-07-13 Thread Konrad “ktoso” Malawski
Hi there,
you do realize that UDP, the protocol itself, does not guarantee delivery
of anything?
It absolutely may and will in practice drop packets - it is designed to do
exactly that.

Start reading about UDP here
https://en.wikipedia.org/wiki/User_Datagram_Protocol and take it from there
to networking books and articles.

In short, if you’re suprised that UDP “dropped”, you should likely not be
using UDP at all :-)

— Konrad


On 13 July 2017 at 21:34:17, Madabhattula Rajesh Kumar (mrajaf...@gmail.com)
wrote:

Hi,

I am using Akka Streams to read the messages from UDP port and write into
the filesystem. It is not able to read all messages. Some messages are
dropping.

I have found one example program in the github.

https://github.com/jpthomasset/akka-udp-stream

val source = UdpSource(new InetSocketAddress("127.0.0.1", 9876), 100)
val result = source.map(x =>
x.data.decodeString("UTF-8")).runWith(lineSink(filePath))
import system.dispatcher
result.onComplete { _ => system.terminate() }

def lineSink(filename: String): Sink[String, Future[IOResult]] = {
Flow[String]
  .map(s => ByteString(s + "\n"))
  .toMat(FileIO.toFile(new File(filename)))(Keep.right)
  }

*Do we need to increase the "maxBufferSize" to receive all messages from
UDP?*

Please let me know how to receive all messages?

Regards,
Rajesh
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams - UDP messages

2017-07-13 Thread Madabhattula Rajesh Kumar
Hi,

I am using Akka Streams to read the messages from UDP port and write into
the filesystem. It is not able to read all messages. Some messages are
dropping.

I have found one example program in the github.

https://github.com/jpthomasset/akka-udp-stream

val source = UdpSource(new InetSocketAddress("127.0.0.1", 9876), 100)
val result = source.map(x =>
x.data.decodeString("UTF-8")).runWith(lineSink(filePath))
import system.dispatcher
result.onComplete { _ => system.terminate() }

def lineSink(filename: String): Sink[String, Future[IOResult]] = {
Flow[String]
  .map(s => ByteString(s + "\n"))
  .toMat(FileIO.toFile(new File(filename)))(Keep.right)
  }

*Do we need to increase the "maxBufferSize" to receive all messages from
UDP?*

Please let me know how to receive all messages?

Regards,
Rajesh

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Generic streams and abstract types

2017-07-06 Thread Jeff
Here is a strawman program which illustrates the issue I am having

trait RequestBuilder {
  type Out

  def complete(p: Promise[Out]): Unit
}

def makeRequest(in: RequestBuilder): Source[(RequestBuilder, Promise[in.Out]), 
Future[in.Out]] = {
  val p = Promise[in.Out]

  Source.single(in -> p).mapMaterializedValue(_ => p.future)
}

val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach {
  case (r, p) => r.complete(p)
}).run()

sink.runWith(makeRequest(new RequestBuilder {
  type Out = Int

  def complete(p: Promise[Out]): Unit = p.success(1)
}))


The issue is, how do I type the Promise[???]  in the sink? I have been able 
to work around this by making the Promise a part of the RequestBuilder 
trait itself, but this seems like a code smell to me

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-streams: what causes the downstream to be "cancelled"/onDownstreamFinish called?

2017-06-26 Thread Konrad Malawski
yes, a failure in the stream could lead to cancellation of the upstream
stages of where the error was induced. it basically is the way a stream is
being shut down on a failure.

On Jun 26, 2017 21:04, "Josh F"  wrote:

> Hi Konrad,
>
> Thanks for the reply!
>
> I can't really share the full snippet as it has a load of company-specific
> code in there, but good to know that the code I pasted would never cancel.
> I will try to figure out if something else is causing the application to
> stop.
>
> Just wondering, if I didn't have the recoverWith and my handler could
> return a failed future, could that cause the downstream to cancel?
>
> Also not sure if it makes a difference, but I do have a throttler before
> the mapAsync call:
> .throttle(elements = 1, per = 2, maximumBurst = 1, ThrottleMode.shaping)
>
> Thanks,
> Josh
>
>
>
> On Monday, June 26, 2017 at 7:25:38 PM UTC+1, Konrad Malawski wrote:
>>
>> Could you share the full snippet?
>> The code as pasted above would not cancel (downstream cancels, but a
>> Sink.ignore never would cancel).
>> Are you sure you’re not terminating the app or something else that would
>> cause the stream to stop?
>> — Konrad
>>
>>
>> On June 27, 2017 at 3:23:04 AM, Josh F (jof...@gmail.com) wrote:
>>
>> Hi all,
>>
>> I am new to akka-streams - I've just set up a basic pipeline which
>> consumes from a messaging queue and then does some processing on the stream.
>> But after running for a while, the application ceases to process
>> messages, apparently because the downstream has been "cancelled" and
>> onDownstreamFinish has been called on the source.
>>
>> I just want to know what the reason for this might be? What would cause
>> the downstream to be cancelled?
>>
>> For context, my pipeline is very simple - it looks like this:
>>
>> def source: Source[MyMessage, NotUsed] = 
>> def messageHandler(message: MyMessage): Future[Unit] = {
>>   val result =  // Some code here which processes the message and
>> returns a Future[Unit]
>>   result.recoverWith {
>> case _ => Future.successful()
>>   }
>> }
>> def pipeline = source.mapAsync(10)(messageHandler)
>> pipeline.runWith(Sink.ignore)
>>
>> Note that the use of recoverWith means that the message handler will
>> always return a future which completes successfully - even if an exception
>> is thrown somewhere.
>>
>> Is anyone able to offer some advice / insight into what might be
>> happening?
>>
>> Thanks,
>> Josh
>>
>>
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-streams: what causes the downstream to be "cancelled"/onDownstreamFinish called?

2017-06-26 Thread Josh F
Hi Konrad,

Thanks for the reply!

I can't really share the full snippet as it has a load of company-specific 
code in there, but good to know that the code I pasted would never cancel. 
 I will try to figure out if something else is causing the application to 
stop.

Just wondering, if I didn't have the recoverWith and my handler could 
return a failed future, could that cause the downstream to cancel?

Also not sure if it makes a difference, but I do have a throttler before 
the mapAsync call:
.throttle(elements = 1, per = 2, maximumBurst = 1, ThrottleMode.shaping)

Thanks,
Josh



On Monday, June 26, 2017 at 7:25:38 PM UTC+1, Konrad Malawski wrote:
>
> Could you share the full snippet?
> The code as pasted above would not cancel (downstream cancels, but a 
> Sink.ignore never would cancel).
> Are you sure you’re not terminating the app or something else that would 
> cause the stream to stop?
> — Konrad
>
>
> On June 27, 2017 at 3:23:04 AM, Josh F (jof...@gmail.com ) 
> wrote:
>
> Hi all, 
>
> I am new to akka-streams - I've just set up a basic pipeline which 
> consumes from a messaging queue and then does some processing on the stream.
> But after running for a while, the application ceases to process messages, 
> apparently because the downstream has been "cancelled" and 
> onDownstreamFinish has been called on the source.
>
> I just want to know what the reason for this might be? What would cause 
> the downstream to be cancelled?
>
> For context, my pipeline is very simple - it looks like this: 
>
> def source: Source[MyMessage, NotUsed] = 
> def messageHandler(message: MyMessage): Future[Unit] = {
>   val result =  // Some code here which processes the message and 
> returns a Future[Unit] 
>   result.recoverWith {
> case _ => Future.successful()
>   }
> }
> def pipeline = source.mapAsync(10)(messageHandler)
> pipeline.runWith(Sink.ignore)
>
> Note that the use of recoverWith means that the message handler will 
> always return a future which completes successfully - even if an exception 
> is thrown somewhere.
>
> Is anyone able to offer some advice / insight into what might be happening?
>
> Thanks,
> Josh
>
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-streams: what causes the downstream to be "cancelled"/onDownstreamFinish called?

2017-06-26 Thread Konrad “ktoso” Malawski
Could you share the full snippet?
The code as pasted above would not cancel (downstream cancels, but a
Sink.ignore never would cancel).
Are you sure you’re not terminating the app or something else that would
cause the stream to stop?
— Konrad


On June 27, 2017 at 3:23:04 AM, Josh F (jof...@gmail.com) wrote:

Hi all,

I am new to akka-streams - I've just set up a basic pipeline which consumes
from a messaging queue and then does some processing on the stream.
But after running for a while, the application ceases to process messages,
apparently because the downstream has been "cancelled" and
onDownstreamFinish has been called on the source.

I just want to know what the reason for this might be? What would cause the
downstream to be cancelled?

For context, my pipeline is very simple - it looks like this:

def source: Source[MyMessage, NotUsed] = 
def messageHandler(message: MyMessage): Future[Unit] = {
  val result =  // Some code here which processes the message and
returns a Future[Unit]
  result.recoverWith {
case _ => Future.successful()
  }
}
def pipeline = source.mapAsync(10)(messageHandler)
pipeline.runWith(Sink.ignore)

Note that the use of recoverWith means that the message handler will always
return a future which completes successfully - even if an exception is
thrown somewhere.

Is anyone able to offer some advice / insight into what might be happening?

Thanks,
Josh



--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka-streams: what causes the downstream to be "cancelled"/onDownstreamFinish called?

2017-06-26 Thread Josh F
Hi all,

I am new to akka-streams - I've just set up a basic pipeline which consumes 
from a messaging queue and then does some processing on the stream.
But after running for a while, the application ceases to process messages, 
apparently because the downstream has been "cancelled" and 
onDownstreamFinish has been called on the source.

I just want to know what the reason for this might be? What would cause the 
downstream to be cancelled?

For context, my pipeline is very simple - it looks like this: 

def source: Source[MyMessage, NotUsed] = 
def messageHandler(message: MyMessage): Future[Unit] = {
  val result =  // Some code here which processes the message and 
returns a Future[Unit] 
  result.recoverWith {
case _ => Future.successful()
  }
}
def pipeline = source.mapAsync(10)(messageHandler)
pipeline.runWith(Sink.ignore)

Note that the use of recoverWith means that the message handler will always 
return a future which completes successfully - even if an exception is 
thrown somewhere.

Is anyone able to offer some advice / insight into what might be happening?

Thanks,
Josh



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams / Play pattern

2017-06-21 Thread ivan morozov
I want to build a play application fetching different API's, persisting the 
processed results and stream the results via WebSockets to web clients.

I have N actors for N API's  in the main controller who are injected by 
AkkaGuice. 

Then I have N GraphStages that are in fact responsible for the whole 
process of getting Data from the DataSources do all the preprocessing, 
persisting the data and publishing the data to the ActorSystem event 
stream. 

The API actors subscribe the ActorSystem event stream look for Messages of 
their interest and forward them to the Dynamically Created User Actors 
(here I used the pattern from 
the https://github.com/playframework/play-scala-websocket-example)

I really enjoy the idiomatic approach writing data processing logic with 
the graph DSL but publishing the data on the EventStream feels wrong. 

I thought something about to have a single graph stage that I inject inside 
the controller by starting the app which handles the whole fetching and 
persistence logic but also is able to distribute the data to the dynamical 
group of user actors but without publishing it to the EventStream

Is there an idiomatic approach to solving my problem?

Thanks
Ivan


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams: Best business case to implement

2017-06-13 Thread Rambabu Posa
Hi 
Would like to implement one Data Streaming application using Akka Streams. 
Have gone through Akka Streams doc and some online resources, books and 
found that most of the people are using Twitter business case to experiment 
Akka Streams.

I would like to implement one web application using 

   - Play
   - Scala
   - Akka
   - Akka Streams

to dirty my hands with something best suitable business use-case.

Please suggest me some suitable/best use-case.

Many thanks,
Ram

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] StreamConverters.fromOutputStream(..) stops processing messages

2017-05-16 Thread Alexey Shuksto
Thanks for the tip!

I've found the culpit -- threads were not blocked, they all were waiting 
for read from corresponding input streams, separate dispatcher for intput 
and output streams resolves this problem.

Still, I'm not comfortable with so many thread blocking on read, so I'll 
try to rewrite this with NuProcess library.

вторник, 16 мая 2017 г., 16:44:52 UTC+3 пользователь Martynas Mickevičius 
написал:
>
> It could be that all of the threads are blocked in the 
> default-blocking-io-dispatcher 
> Make a thread dump and take a look if that is the case.
>
> On Mon, May 15, 2017 at 8:17 PM Alexey Shuksto  > wrote:
>
>> Hello hAkkers,
>>
>> Our app spawns multiple external processes (via 
>> java.util.Runtime.exec(..)) and then writes to STDIN of such processes via:
>> ```
>> StreamConverters
>>   .fromOutputStream(() => process.getOutputStream, autoFlush = true)
>>   .runWith(MergeHub.source[])
>> ```
>>
>> MergeHub is attached to output stream to prevent closing of such stream 
>> after one of client done writing to it.
>>
>> While there are up to, say, four external processes everything is working 
>> fine, but as soon as they number increases, OutputStreamSubscriber just 
>> stops to request next elements for all of them.
>> I've checked via adding `.confate` for incoming ByteStrings -- buffer 
>> just keeps growing.
>>
>> It is possible to replace StreamConverters with simple
>> ```
>> Sink
>>   .foreach[ByteString] { 
>> process.getOutputStream.write(bs.toArray)
>> process.getOutputStream.flush()
>>   }
>> ```
>> and everything would work, but it's blocking and stuff.
>>
>> What may be the cause of this? I've tried to change settings of 
>> "default-blocking-io-dispatcher", but to no effect.
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] StreamConverters.fromOutputStream(..) stops processing messages

2017-05-16 Thread Martynas Mickevičius
It could be that all of the threads are blocked in the
default-blocking-io-dispatcher
Make a thread dump and take a look if that is the case.

On Mon, May 15, 2017 at 8:17 PM Alexey Shuksto  wrote:

> Hello hAkkers,
>
> Our app spawns multiple external processes (via
> java.util.Runtime.exec(..)) and then writes to STDIN of such processes via:
> ```
> StreamConverters
>   .fromOutputStream(() => process.getOutputStream, autoFlush = true)
>   .runWith(MergeHub.source[])
> ```
>
> MergeHub is attached to output stream to prevent closing of such stream
> after one of client done writing to it.
>
> While there are up to, say, four external processes everything is working
> fine, but as soon as they number increases, OutputStreamSubscriber just
> stops to request next elements for all of them.
> I've checked via adding `.confate` for incoming ByteStrings -- buffer just
> keeps growing.
>
> It is possible to replace StreamConverters with simple
> ```
> Sink
>   .foreach[ByteString] {
> process.getOutputStream.write(bs.toArray)
> process.getOutputStream.flush()
>   }
> ```
> and everything would work, but it's blocking and stuff.
>
> What may be the cause of this? I've tried to change settings of
> "default-blocking-io-dispatcher", but to no effect.
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] StreamConverters.fromOutputStream(..) stops processing messages

2017-05-15 Thread Alexey Shuksto
Hello hAkkers,

Our app spawns multiple external processes (via java.util.Runtime.exec(..)) 
and then writes to STDIN of such processes via:
```
StreamConverters
  .fromOutputStream(() => process.getOutputStream, autoFlush = true)
  .runWith(MergeHub.source[])
```

MergeHub is attached to output stream to prevent closing of such stream 
after one of client done writing to it.

While there are up to, say, four external processes everything is working 
fine, but as soon as they number increases, OutputStreamSubscriber just 
stops to request next elements for all of them.
I've checked via adding `.confate` for incoming ByteStrings -- buffer just 
keeps growing.

It is possible to replace StreamConverters with simple
```
Sink
  .foreach[ByteString] { 
process.getOutputStream.write(bs.toArray)
process.getOutputStream.flush()
  }
```
and everything would work, but it's blocking and stuff.

What may be the cause of this? I've tried to change settings of 
"default-blocking-io-dispatcher", but to no effect.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka-streams supervision: best practices

2017-05-02 Thread Konrad Malawski
Hi Alexander,
since you asked the same question on the issue tracker at the same time
allow me to link those and let's continue the discussion here if you want;

https://github.com/akka/akka/issues/21192#issuecomment-298806620


Firstly: I don't think that's entirely true. Existing supervision indeed
only works when stages "support it". One has to notice that it's not as
simple as it may seem from a high level perspective, since there's buffers
involved between stages so what would "restart a section", or what *should*
 it *really* mean etc.

However, there's many ways to add restarting or other behaviours.

Including, but not limited to:

   - Retry -
   
https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Retry.scala
   - recoverWith
   
http://doc.akka.io/docs/akka/2.5.1/scala/stream/stages-overview.html#recoverWith
   - recoverWithRetries
   
http://doc.akka.io/docs/akka/2.5.1/scala/stream/stages-overview.html#recoverWithRetries
   - mapError
   http://doc.akka.io/docs/akka/2.5.1/scala/stream/stages-overview.html#mapError
   - and more stages, such as watchTermination, monitor etc.
   - the existing supervision when it's enough
   - or hosting within Actors and reacting to a termination with how you
   want.

Do bear in mind that Akka was, is and remains a toolkit. The tools are
there, with killswitches, a spec how failure is propagated (reactive
streams, which will be in JDK9)

Would we want to improve the current supervision scheme?
Yes, we'd like to. Are we right now going to work on it? No, we're focused
on other areas right now that are more pressing.

What would be helpful now: What I think would be the best next step here,
is to actually *come up with patterns* - be it as blogs, or writeups in
issues or PRs that pre-package behaviour such as the Retry stage did, and
once we have the real use cases we can think about it again, because a
*generic* "supervision" is very hard (note the generic part in that phrase,
specialized ones are simple), and *very* likely not what you'd actually
want (it would do unexpected surprising things, we experimented with some
restarting schemes).

I do agree though that all those patterns should be then collected and put
to: http://doc.akka.io/docs/akka/2.5.1/scala/stream/stream-error.html We're
right now working on a new docs and website page, so once that's done we'll
focus on such content.

Hope this answers the question about status, next steps, and how you can
help :-)



-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 3 May 2017 at 08:20:50, Alexander Temerev (sor...@gmail.com) wrote:

Hi everyone,

Is there any example or document describing best practices for supervising
and monitoring akka streams? E.g. restarting Websocket streams on
disconnect, backing off on repeated failures, creating supervision groups
(e.g. if some stream failed, I also want to fail other streams in the group
and restart them at once), making supervision hierarchies, etc.

Streams are incredibly handy and well-typed, but I don't see yet how I can
recreate these powerful supervision tools available in plain Akka,
especially in dynamic flow graphs where streams can be attached and
detached at runtime.

"Introduction to streams lifecycle" would be incredibly handy, if it exists
somewhere.

Best,
Alexander
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka-streams supervision: best practices

2017-05-02 Thread Alexander Temerev
Hi everyone,

Is there any example or document describing best practices for supervising 
and monitoring akka streams? E.g. restarting Websocket streams on 
disconnect, backing off on repeated failures, creating supervision groups 
(e.g. if some stream failed, I also want to fail other streams in the group 
and restart them at once), making supervision hierarchies, etc.

Streams are incredibly handy and well-typed, but I don't see yet how I can 
recreate these powerful supervision tools available in plain Akka, 
especially in dynamic flow graphs where streams can be attached and 
detached at runtime.

"Introduction to streams lifecycle" would be incredibly handy, if it exists 
somewhere.

Best,
Alexander

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka streams - Throttle slowing down stream by huge margins

2017-05-01 Thread Sean Callahan
Hey guys. I have been trying to setup a stream using throttle, but I am 
seeing some weird behavior. In the following stream I am seeing a peak of 
20k events per second in my publish stat.

Source.fromGraph(eventBusSource)
  .via(filter)
  .expand(Iterator.continually(_))
  .via(StreamUtils.parallelize(10, modify))
  // .throttle(eventsPerSecond, 1.second, 1, ThrottleMode.Shaping)
  .via(StreamUtils.parallelize(20, publishFlow))
  .takeWithin(duration)
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(Sink.fold(0)((acc, _) => acc + 1))(Keep.both)

However, when I uncomment the throttle stage and set events per second to 
5k, the speed at which I publish drops to a few hundred per second. For 
some reason, the throttle stage is dramatically slowing down my graph.

Am I just not understanding some fundamental concept about throttle?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka streams: Why use .named()?

2017-04-12 Thread Konrad Malawski
Not sure what you're referring to?
Yes it does/would come into play when we draw stuff.

On Apr 13, 2017 03:07, "Sean Callahan"  wrote:

> Okay, that's kinda what I figured. Will the named attribute come into play
> at all with some of the graphing Ive seen a little of in 2.5?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka streams: Why use .named()?

2017-04-12 Thread Sean Callahan
Okay, that's kinda what I figured. Will the named attribute come into play 
at all with some of the graphing Ive seen a little of in 2.5?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka streams: Why use .named()?

2017-04-12 Thread Konrad Malawski
Better debugability - names can show up in logging / failures / inspections
of graphs etc.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 13 April 2017 at 02:59:49, Sean Callahan (sean.callahan...@gmail.com)
wrote:

While browsing the akka docs I came across heavy use of the .named method
in http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-composition.html


In the docs, it is mentioned that this creates nested sources/flows/sinks,
but Im not quite sure what this provides me exactly. Im guessing that I
should be using it more in some of my complex graphs, but Im not.

For example if I have two identical flows, what does a named flow buy me
over an unnamed one?
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka streams: Why use .named()?

2017-04-12 Thread Sean Callahan
While browsing the akka docs I came across heavy use of the .named method 
in http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-composition.html 

In the docs, it is mentioned that this creates nested sources/flows/sinks, 
but Im not quite sure what this provides me exactly. Im guessing that I 
should be using it more in some of my complex graphs, but Im not. 

For example if I have two identical flows, what does a named flow buy me 
over an unnamed one?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka-streams-kafka exponential backoff

2017-03-28 Thread 'Michal Borowiecki' via Akka User List

Hello,

Lagom has a nice feature when subscribing to kafka topics that upon 
failure it re-creates the flow with exponential backoff.


I see this is implemented by wrapping the KafkaSubscriberActor props 
with BackoffSupervisor props:


https://github.com/lagom/lagom/blob/master/service/scaladsl/kafka/client/src/main/scala/com/lightbend/lagom/internal/scaladsl/broker/kafka/ScaladslKafkaSubscriber.scala#L105


What is the best way to implement the same using akka-streams-kafka 
directly?


Having read 
http://doc.akka.io/docs/akka/2.4.17/java/stream/stream-error.html I 
thought the way would be via ActorMaterializerSettings but I can't 
figure out how and I'm hoping that for someone here it will be an easy 
question.


Cheers,

Michal

--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK




This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com  and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


--

 Read the docs: http://akka.io/docs/
 Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
 Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka User List" group.

To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams + TCP + TLS

2017-03-15 Thread Pablo Milanese

I was able to resolve the issue ...
I had to change the version of .jar ssl-config-core_2.12.0-M3-0.2.1.jar
By ssl-config-core_2.12-0.2.2.jar
Thanks again



El miércoles, 15 de marzo de 2017, 11:25:42 (UTC-3), Pablo Milanese 
escribió:
>
> Totally agree .. However, maybe explaining my error better, can give me a 
> clue.
>
> I suspect that my mistake can be due to two things:
>
> (1) Some conversion between scala - java8, given the new "compatibility" 
> between scala 12 and java 8 ..
> (2) The fact that scala 2.11 and 2.12 are not binary compatible
>
>
> 
> For (1):
>
> See: "Scala and Java 8 interop is also improved for functional code, the 
> methods that take functions can easily be called in both directions using 
> lambda syntax. SAM types are treated uniformly - from type checking through 
> code generation. In the class file is generated for lambda; invokedynamic 
> is used instead.
>
> ( http://www.scala-lang.org/news/2.12.0 ) Maybe some issue still unknown? 
> I am not sure about this point, but I mention it just in case
>
> 
>
> In this case, the line on which I am failing is:
>
> TlsSupport.atop (tls)
>
>
> "Previously, the defined bidiflows are:
>
> Val tlsSupport: BidiFlow [ByteString, TLSProtocol.SslTlsOutbound, 
> TLSProtocol.SslTlsInbound, ByteString, NotUsed] =
> BidiFlow.fromFlows (
>  Flow [ByteString] .map (TLSProtocol.SendBytes (_)),
>  Flow [TLSProtocol.SslTlsInbound] .collect (pf))
>
>
> Val tls: BidiFlow [TLSProtocol.SslTlsOutbound, ByteString, ByteString, 
> TLSProtocol.SslTlsInbound, NotUsed] =
>   TLS (sslContext, firstSession, role)
>
>
> "
>
> Anyway, I'll look at the dependencies.
> Thank you 
>
> El miércoles, 15 de marzo de 2017, 10:50:38 (UTC-3), Konrad Malawski 
> escribió:
>>
>> Uhm... please use some serious dependency management tool.
>> You're missing ssl-config.
>>
>> Please read the dependencies that the projects have, you simply are 
>> missing things from your dependencies.
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka  @ Lightbend 
>>
>> On 15 March 2017 at 14:42:15, Pablo Milanese (pablomi...@gmail.com) 
>> wrote:
>>
>> Buenas!
>>
>> Java 1.8.0_121 
>> Scala 2.12.1
>>
>>
>> I have no SBT .. We use Ivy in a local repository:
>>
>> For example: > rev="2.4.17"/>
>>
>> (All the next jars, are for scala 2.12.. for example: akka-stream_2.12)
>>
>> Jars:
>>
>> akka-actor.jar
>> akka-camel.jar
>> akka-protobuf.jar
>> akka-remote.jar
>> akka-slf4j.jar
>> akka-stream-testkit.jar
>> akka-stream.jar
>> akka-testkit.jar
>> config.jar
>> reactive-streams.jar
>> scala-java8-compat_2.12-0.8.0.jar
>> ssl-config-core_2.12.0-M3-0.2.1.jar
>> scala-compiler.jar
>> scala-library-sources.jar
>> scala-library.jar
>> scala-parser-combinators.jar
>> scala-reflect.jar
>> scala-xml.jar
>> scalap.jar
>>
>>
>> I think these are the most important.
>>
>> Thank you !
>>
>>
>>
>>
>> El miércoles, 15 de marzo de 2017, 7:41:08 (UTC-3), Rafał Krzewski 
>> escribió: 
>>>
>>> What JVM and Scala versions are you using? What are your project's 
>>> dependency versions? 
>>> hint:
>>> os shell> java -version
>>> sbt> scalaVersion
>>> sbt> libraryDependencies
>>>
>>> cheers,
>>> Rafał
>>>
>>> W dniu wtorek, 14 marca 2017 20:23:15 UTC+1 użytkownik Pablo Milanese 
>>> napisał: 

 Hi Konrad, 

 At the first view, I think the could not fnid any error .. but please 
 take into account that I am a scala beginner :)
 Now,  I am trying to test the code, and I am having a problem whe I 
 execute the line:

 val sslConfig: AkkaSSLConfig = AkkaSSLConfig.get(system)


 java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: 
 scala/runtime/java8/JFunction1
 at 
 com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala:180)
 at 
 com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala)
 at com.typesafe.sslconfig.ssl.SSLConfigParser.parse(Config.scala:495)
 at com.typesafe.sslconfig.ssl.SSLConfigFactory$.parse(Config.scala:483)
 at 
 com.typesafe.sslconfig.akka.AkkaSSLConfig$.defaultSSLConfigSettings(AkkaSSLConfig.scala:34)
 at 
 com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:29)
 at 
 com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:19)
 at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:899)
 at akka.actor.ExtensionId.apply(Extension.scala:79)
 at akka.actor.ExtensionId.apply$(Extension.scala:79)
 at 
 com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:24)
 at 
 com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:19)
 at akka.actor.ExtensionId.get(Extension.scala:91)
 at akka.actor.ExtensionId.get$(Extension.scala:91)
 at 
 com.typesafe.sslconfig.akka.AkkaSSLConfig$.get(AkkaSSLConfig.scala:23)
 at 
 com.paytrue.swakka.actors.c

Re: [akka-user] Akka Streams + TCP + TLS

2017-03-15 Thread Pablo Milanese
Totally agree .. However, maybe explaining my error better, can give me a 
clue.

I suspect that my mistake can be due to two things:

(1) Some conversion between scala - java8, given the new "compatibility" 
between scala 12 and java 8 ..
(2) The fact that scala 2.11 and 2.12 are not binary compatible



For (1):

See: "Scala and Java 8 interop is also improved for functional code, the 
methods that take functions can easily be called in both directions using 
lambda syntax. SAM types are treated uniformly - from type checking through 
code generation. In the class file is generated for lambda; invokedynamic 
is used instead.

( http://www.scala-lang.org/news/2.12.0 ) Maybe some issue still unknown? I 
am not sure about this point, but I mention it just in case



In this case, the line on which I am failing is:

TlsSupport.atop (tls)


"Previously, the defined bidiflows are:

Val tlsSupport: BidiFlow [ByteString, TLSProtocol.SslTlsOutbound, 
TLSProtocol.SslTlsInbound, ByteString, NotUsed] =
BidiFlow.fromFlows (
 Flow [ByteString] .map (TLSProtocol.SendBytes (_)),
 Flow [TLSProtocol.SslTlsInbound] .collect (pf))


Val tls: BidiFlow [TLSProtocol.SslTlsOutbound, ByteString, ByteString, 
TLSProtocol.SslTlsInbound, NotUsed] =
  TLS (sslContext, firstSession, role)


"

Anyway, I'll look at the dependencies.
Thank you 

El miércoles, 15 de marzo de 2017, 10:50:38 (UTC-3), Konrad Malawski 
escribió:
>
> Uhm... please use some serious dependency management tool.
> You're missing ssl-config.
>
> Please read the dependencies that the projects have, you simply are 
> missing things from your dependencies.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 15 March 2017 at 14:42:15, Pablo Milanese (pablomi...@gmail.com 
> ) wrote:
>
> Buenas!
>
> Java 1.8.0_121 
> Scala 2.12.1
>
>
> I have no SBT .. We use Ivy in a local repository:
>
> For example:  rev="2.4.17"/>
>
> (All the next jars, are for scala 2.12.. for example: akka-stream_2.12)
>
> Jars:
>
> akka-actor.jar
> akka-camel.jar
> akka-protobuf.jar
> akka-remote.jar
> akka-slf4j.jar
> akka-stream-testkit.jar
> akka-stream.jar
> akka-testkit.jar
> config.jar
> reactive-streams.jar
> scala-java8-compat_2.12-0.8.0.jar
> ssl-config-core_2.12.0-M3-0.2.1.jar
> scala-compiler.jar
> scala-library-sources.jar
> scala-library.jar
> scala-parser-combinators.jar
> scala-reflect.jar
> scala-xml.jar
> scalap.jar
>
>
> I think these are the most important.
>
> Thank you !
>
>
>
>
> El miércoles, 15 de marzo de 2017, 7:41:08 (UTC-3), Rafał Krzewski 
> escribió: 
>>
>> What JVM and Scala versions are you using? What are your project's 
>> dependency versions? 
>> hint:
>> os shell> java -version
>> sbt> scalaVersion
>> sbt> libraryDependencies
>>
>> cheers,
>> Rafał
>>
>> W dniu wtorek, 14 marca 2017 20:23:15 UTC+1 użytkownik Pablo Milanese 
>> napisał: 
>>>
>>> Hi Konrad, 
>>>
>>> At the first view, I think the could not fnid any error .. but please 
>>> take into account that I am a scala beginner :)
>>> Now,  I am trying to test the code, and I am having a problem whe I 
>>> execute the line:
>>>
>>> val sslConfig: AkkaSSLConfig = AkkaSSLConfig.get(system)
>>>
>>>
>>> java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: 
>>> scala/runtime/java8/JFunction1
>>> at 
>>> com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala:180)
>>> at 
>>> com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala)
>>> at com.typesafe.sslconfig.ssl.SSLConfigParser.parse(Config.scala:495)
>>> at com.typesafe.sslconfig.ssl.SSLConfigFactory$.parse(Config.scala:483)
>>> at 
>>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.defaultSSLConfigSettings(AkkaSSLConfig.scala:34)
>>> at 
>>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:29)
>>> at 
>>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:19)
>>> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:899)
>>> at akka.actor.ExtensionId.apply(Extension.scala:79)
>>> at akka.actor.ExtensionId.apply$(Extension.scala:79)
>>> at 
>>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:24)
>>> at 
>>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:19)
>>> at akka.actor.ExtensionId.get(Extension.scala:91)
>>> at akka.actor.ExtensionId.get$(Extension.scala:91)
>>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.get(AkkaSSLConfig.scala:23)
>>> at 
>>> com.paytrue.swakka.actors.channels.AbstractTcpServer.tlsStage(TcpServer.scala:88)
>>> at 
>>> com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1(TcpServer.scala:67)
>>> at 
>>> com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1$adapted(TcpServer.scala:64)
>>> at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
>>> at 
>>> akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
>>> at 

Re: [akka-user] Akka Streams + TCP + TLS

2017-03-15 Thread Konrad Malawski
Uhm... please use some serious dependency management tool.
You're missing ssl-config.

Please read the dependencies that the projects have, you simply are missing
things from your dependencies.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 15 March 2017 at 14:42:15, Pablo Milanese (pablomilanes...@gmail.com)
wrote:

Buenas!

Java 1.8.0_121
Scala 2.12.1


I have no SBT .. We use Ivy in a local repository:

For example: 

(All the next jars, are for scala 2.12.. for example: akka-stream_2.12)

Jars:

akka-actor.jar
akka-camel.jar
akka-protobuf.jar
akka-remote.jar
akka-slf4j.jar
akka-stream-testkit.jar
akka-stream.jar
akka-testkit.jar
config.jar
reactive-streams.jar
scala-java8-compat_2.12-0.8.0.jar
ssl-config-core_2.12.0-M3-0.2.1.jar
scala-compiler.jar
scala-library-sources.jar
scala-library.jar
scala-parser-combinators.jar
scala-reflect.jar
scala-xml.jar
scalap.jar


I think these are the most important.

Thank you !




El miércoles, 15 de marzo de 2017, 7:41:08 (UTC-3), Rafał Krzewski
escribió:
>
> What JVM and Scala versions are you using? What are your project's
> dependency versions?
> hint:
> os shell> java -version
> sbt> scalaVersion
> sbt> libraryDependencies
>
> cheers,
> Rafał
>
> W dniu wtorek, 14 marca 2017 20:23:15 UTC+1 użytkownik Pablo Milanese
> napisał:
>>
>> Hi Konrad,
>>
>> At the first view, I think the could not fnid any error .. but please
>> take into account that I am a scala beginner :)
>> Now,  I am trying to test the code, and I am having a problem whe I
>> execute the line:
>>
>> val sslConfig: AkkaSSLConfig = AkkaSSLConfig.get(system)
>>
>>
>> java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError:
>> scala/runtime/java8/JFunction1
>> at com.typesafe.sslconfig.util.ConfigLoader$.(
>> Configuration.scala:180)
>> at com.typesafe.sslconfig.util.ConfigLoader$.(
>> Configuration.scala)
>> at com.typesafe.sslconfig.ssl.SSLConfigParser.parse(Config.scala:495)
>> at com.typesafe.sslconfig.ssl.SSLConfigFactory$.parse(Config.scala:483)
>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.defaultSSLConfigSettings(
>> AkkaSSLConfig.scala:34)
>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.
>> createExtension(AkkaSSLConfig.scala:29)
>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.
>> createExtension(AkkaSSLConfig.scala:19)
>> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:899)
>> at akka.actor.ExtensionId.apply(Extension.scala:79)
>> at akka.actor.ExtensionId.apply$(Extension.scala:79)
>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(
>> AkkaSSLConfig.scala:24)
>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(
>> AkkaSSLConfig.scala:19)
>> at akka.actor.ExtensionId.get(Extension.scala:91)
>> at akka.actor.ExtensionId.get$(Extension.scala:91)
>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.get(AkkaSSLConfig.scala:23)
>> at com.paytrue.swakka.actors.channels.AbstractTcpServer.
>> tlsStage(TcpServer.scala:88)
>> at com.paytrue.swakka.actors.channels.AbstractTcpServer.$
>> anonfun$handler$1(TcpServer.scala:67)
>> at com.paytrue.swakka.actors.channels.AbstractTcpServer.$
>> anonfun$handler$1$adapted(TcpServer.scala:64)
>> at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
>> at akka.stream.impl.fusing.GraphInterpreter.processPush(
>> GraphInterpreter.scala:747)
>> at akka.stream.impl.fusing.GraphInterpreter.processEvent(
>> GraphInterpreter.scala:710)
>> at akka.stream.impl.fusing.GraphInterpreter.execute(
>> GraphInterpreter.scala:616)
>> at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(
>> ActorGraphInterpreter.scala:471)
>> at akka.stream.impl.fusing.GraphInterpreterShell.receive(
>> ActorGraphInterpreter.scala:423)
>> at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$
>> ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
>> at akka.stream.impl.fusing.ActorGraphInterpreter$$
>> anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
>> at akka.actor.Actor.aroundReceive(Actor.scala:497)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:495)
>> at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(
>> ActorGraphInterpreter.scala:529)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
>> 1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at java.util.concurrent.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:157)
>> Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1
>> ... 38 more
>> Caused by: java.lang.ClassNotFoundException:
>> scala.runtime.java8.JFunction1
>> at java.net.URLClassLoader.findClass(URLClass

Re: [akka-user] Akka Streams + TCP + TLS

2017-03-15 Thread Pablo Milanese
Buenas!

Java 1.8.0_121
Scala 2.12.1


I have no SBT .. We use Ivy in a local repository:

For example: 

(All the next jars, are for scala 2.12.. for example: akka-stream_2.12)

Jars:

akka-actor.jar
akka-camel.jar
akka-protobuf.jar
akka-remote.jar
akka-slf4j.jar
akka-stream-testkit.jar
akka-stream.jar
akka-testkit.jar
config.jar
reactive-streams.jar
scala-java8-compat_2.12-0.8.0.jar
ssl-config-core_2.12.0-M3-0.2.1.jar
scala-compiler.jar
scala-library-sources.jar
scala-library.jar
scala-parser-combinators.jar
scala-reflect.jar
scala-xml.jar
scalap.jar


I think these are the most important.

Thank you !




El miércoles, 15 de marzo de 2017, 7:41:08 (UTC-3), Rafał Krzewski escribió:
>
> What JVM and Scala versions are you using? What are your project's 
> dependency versions?
> hint:
> os shell> java -version
> sbt> scalaVersion
> sbt> libraryDependencies
>
> cheers,
> Rafał
>
> W dniu wtorek, 14 marca 2017 20:23:15 UTC+1 użytkownik Pablo Milanese 
> napisał:
>>
>> Hi Konrad,
>>
>> At the first view, I think the could not fnid any error .. but please 
>> take into account that I am a scala beginner :)
>> Now,  I am trying to test the code, and I am having a problem whe I 
>> execute the line:
>>
>> val sslConfig: AkkaSSLConfig = AkkaSSLConfig.get(system)
>>
>>
>> java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: 
>> scala/runtime/java8/JFunction1
>> at 
>> com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala:180)
>> at com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala)
>> at com.typesafe.sslconfig.ssl.SSLConfigParser.parse(Config.scala:495)
>> at com.typesafe.sslconfig.ssl.SSLConfigFactory$.parse(Config.scala:483)
>> at 
>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.defaultSSLConfigSettings(AkkaSSLConfig.scala:34)
>> at 
>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:29)
>> at 
>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:19)
>> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:899)
>> at akka.actor.ExtensionId.apply(Extension.scala:79)
>> at akka.actor.ExtensionId.apply$(Extension.scala:79)
>> at 
>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:24)
>> at 
>> com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:19)
>> at akka.actor.ExtensionId.get(Extension.scala:91)
>> at akka.actor.ExtensionId.get$(Extension.scala:91)
>> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.get(AkkaSSLConfig.scala:23)
>> at 
>> com.paytrue.swakka.actors.channels.AbstractTcpServer.tlsStage(TcpServer.scala:88)
>> at 
>> com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1(TcpServer.scala:67)
>> at 
>> com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1$adapted(TcpServer.scala:64)
>> at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
>> at 
>> akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
>> at 
>> akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
>> at 
>> akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
>> at 
>> akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
>> at akka.actor.Actor.aroundReceive(Actor.scala:497)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:495)
>> at 
>> akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at 
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at 
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1
>> ... 38 more
>> Caused by: java.lang.ClassNotFoundException: 
>> scala.runtime.java8.JFunction1
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 38 more
>>
>>
>> Any idea ?
>> Thank you!
>>
>> El lunes, 13 de marzo de 2017, 22:13:57 (UTC-3), Pablo Milanese escribió:
>>>
>>> Hello Konrad,
>>>

Re: [akka-user] Akka Streams + TCP + TLS

2017-03-15 Thread Rafał Krzewski
What JVM and Scala versions are you using? What are your project's 
dependency versions?
hint:
os shell> java -version
sbt> scalaVersion
sbt> libraryDependencies

cheers,
Rafał

W dniu wtorek, 14 marca 2017 20:23:15 UTC+1 użytkownik Pablo Milanese 
napisał:
>
> Hi Konrad,
>
> At the first view, I think the could not fnid any error .. but please take 
> into account that I am a scala beginner :)
> Now,  I am trying to test the code, and I am having a problem whe I 
> execute the line:
>
> val sslConfig: AkkaSSLConfig = AkkaSSLConfig.get(system)
>
>
> java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: 
> scala/runtime/java8/JFunction1
> at 
> com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala:180)
> at com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala)
> at com.typesafe.sslconfig.ssl.SSLConfigParser.parse(Config.scala:495)
> at com.typesafe.sslconfig.ssl.SSLConfigFactory$.parse(Config.scala:483)
> at 
> com.typesafe.sslconfig.akka.AkkaSSLConfig$.defaultSSLConfigSettings(AkkaSSLConfig.scala:34)
> at 
> com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:29)
> at 
> com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:19)
> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:899)
> at akka.actor.ExtensionId.apply(Extension.scala:79)
> at akka.actor.ExtensionId.apply$(Extension.scala:79)
> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:24)
> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:19)
> at akka.actor.ExtensionId.get(Extension.scala:91)
> at akka.actor.ExtensionId.get$(Extension.scala:91)
> at com.typesafe.sslconfig.akka.AkkaSSLConfig$.get(AkkaSSLConfig.scala:23)
> at 
> com.paytrue.swakka.actors.channels.AbstractTcpServer.tlsStage(TcpServer.scala:88)
> at 
> com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1(TcpServer.scala:67)
> at 
> com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1$adapted(TcpServer.scala:64)
> at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
> at 
> akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
> at 
> akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
> at 
> akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
> at 
> akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
> at 
> akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
> at 
> akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
> at 
> akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
> at akka.actor.Actor.aroundReceive(Actor.scala:497)
> at akka.actor.Actor.aroundReceive$(Actor.scala:495)
> at 
> akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1
> ... 38 more
> Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 38 more
>
>
> Any idea ?
> Thank you!
>
> El lunes, 13 de marzo de 2017, 22:13:57 (UTC-3), Pablo Milanese escribió:
>>
>> Hello Konrad,
>>
>> Of course! No problem.
>> I will take a look to the code.
>>
>> Thank you a lot !
>>
>>
>>
>> El lunes, 13 de marzo de 2017, 18:28:42 (UTC-3), Konrad Malawski escribió:
>>>
>>> (did the conversion in a rush)
>>>
>>> -- 
>>> Konrad `ktoso` Malawski
>>> Akka  @ Lightbend 
>>>
>>> On 13 March 2017 at 22:26:51, Pablo Milanese (pablomi...@gmail.com) 
>>> wrote:
>>>
>>>  
>>> https://github.com/typesafehub/activator-akka-stream-java8/blob/master/src/main/java/sample/stream/TcpTLSEcho.java
>>>
>>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because yo

Re: [akka-user] Akka Streams + TCP + TLS

2017-03-14 Thread Pablo Milanese
Hi Konrad,

At the first view, I think the could not fnid any error .. but please take 
into account that I am a scala beginner :)
Now,  I am trying to test the code, and I am having a problem whe I execute 
the line:

val sslConfig: AkkaSSLConfig = AkkaSSLConfig.get(system)


java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: 
scala/runtime/java8/JFunction1
at com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala:180)
at com.typesafe.sslconfig.util.ConfigLoader$.(Configuration.scala)
at com.typesafe.sslconfig.ssl.SSLConfigParser.parse(Config.scala:495)
at com.typesafe.sslconfig.ssl.SSLConfigFactory$.parse(Config.scala:483)
at 
com.typesafe.sslconfig.akka.AkkaSSLConfig$.defaultSSLConfigSettings(AkkaSSLConfig.scala:34)
at 
com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:29)
at 
com.typesafe.sslconfig.akka.AkkaSSLConfig$.createExtension(AkkaSSLConfig.scala:19)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:899)
at akka.actor.ExtensionId.apply(Extension.scala:79)
at akka.actor.ExtensionId.apply$(Extension.scala:79)
at com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:24)
at com.typesafe.sslconfig.akka.AkkaSSLConfig$.apply(AkkaSSLConfig.scala:19)
at akka.actor.ExtensionId.get(Extension.scala:91)
at akka.actor.ExtensionId.get$(Extension.scala:91)
at com.typesafe.sslconfig.akka.AkkaSSLConfig$.get(AkkaSSLConfig.scala:23)
at 
com.paytrue.swakka.actors.channels.AbstractTcpServer.tlsStage(TcpServer.scala:88)
at 
com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1(TcpServer.scala:67)
at 
com.paytrue.swakka.actors.channels.AbstractTcpServer.$anonfun$handler$1$adapted(TcpServer.scala:64)
at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
at 
akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
at 
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
at 
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at 
akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at 
akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
at akka.actor.Actor.aroundReceive(Actor.scala:497)
at akka.actor.Actor.aroundReceive$(Actor.scala:495)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1
... 38 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 38 more


Any idea ?
Thank you!

El lunes, 13 de marzo de 2017, 22:13:57 (UTC-3), Pablo Milanese escribió:
>
> Hello Konrad,
>
> Of course! No problem.
> I will take a look to the code.
>
> Thank you a lot !
>
>
>
> El lunes, 13 de marzo de 2017, 18:28:42 (UTC-3), Konrad Malawski escribió:
>>
>> (did the conversion in a rush)
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka  @ Lightbend 
>>
>> On 13 March 2017 at 22:26:51, Pablo Milanese (pablomi...@gmail.com) 
>> wrote:
>>
>>  
>> https://github.com/typesafehub/activator-akka-stream-java8/blob/master/src/main/java/sample/stream/TcpTLSEcho.java
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams + TCP + TLS

2017-03-13 Thread Pablo Milanese
Hello Konrad,

Of course! No problem.
I will take a look to the code.

Thank you a lot !



El lunes, 13 de marzo de 2017, 18:28:42 (UTC-3), Konrad Malawski escribió:
>
> (did the conversion in a rush)
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 13 March 2017 at 22:26:51, Pablo Milanese (pablomi...@gmail.com 
> ) wrote:
>
>  
> https://github.com/typesafehub/activator-akka-stream-java8/blob/master/src/main/java/sample/stream/TcpTLSEcho.java
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams + TCP + TLS

2017-03-13 Thread Konrad Malawski
(did the conversion in a rush)

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 13 March 2017 at 22:26:51, Pablo Milanese (pablomilanes...@gmail.com)
wrote:


https://github.com/typesafehub/activator-akka-stream-java8/blob/master/src/main/java/sample/stream/TcpTLSEcho.java

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams + TCP + TLS

2017-03-13 Thread Konrad Malawski
I started it for scala here

https://github.com/typesafehub/activator-akka-stream-scala
However I made some mistake somewhere - would be awesome if you could help
me notice where the mistake was :/

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 13 March 2017 at 22:26:51, Pablo Milanese (pablomilanes...@gmail.com)
wrote:


https://github.com/typesafehub/activator-akka-stream-java8/blob/master/src/main/java/sample/stream/TcpTLSEcho.java

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams + TCP + TLS

2017-03-13 Thread Pablo Milanese
Thank you !

Actually, I am writing in scala 2.12, so it's more difficult than I thought 
(doing the conversion) .. but, when I finished I will publish the solution 
in scala.

Thanks again!

El domingo, 12 de marzo de 2017, 7:23:31 (UTC-3), Konrad Malawski escribió:
>
> Here's a full example which does just that: 
> https://github.com/typesafehub/activator-akka-stream-java8/blob/master/src/main/java/sample/stream/TcpTLSEcho.java
>
> Happy hakking
>
>
> On Mar 11, 2017 10:43 AM, "Pablo Milanese"  > wrote:
>
>> Hello !.
>>
>> I am actually looking for an example of the combinations of 
>> *akka.streams.javadsl.Tcp* and *akka.streams.javadsl.TLS* to create 
>> custom SSL/TLS enabled TCP servers and clients, but I couldn't find 
>> anything .. 
>>
>> I think I have resolved the akka-streams-tcp, but I can't find how can I 
>> join the TLS API on the things that I have:
>>
>> Here's my code for TCP connections using akka streams:
>>
>>
>>
>>   private val connectionsSource: Source[IncomingConnection, 
>> Future[ServerBinding]] =
>> Tcp(context.system)
>>   .bind(transportConfig.getHost, transportConfig.getPort)
>>
>>   private val connectionsSink: Sink[IncomingConnection, Future[Done]] =
>> Sink.foreach(handleIncomingConnection)
>>
>>   private val connectionsGraph: RunnableGraph[(Future[ServerBinding], 
>> Future[Done])] =
>> connectionsSource.toMat(connectionsSink)(Keep.both)
>>
>>   private val echo: Flow[ByteString, ByteString, NotUsed] =
>>
>> Flow[ByteString]
>>   .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, 
>> allowTruncation = true))
>>   .map(_.utf8String)
>>   .map { text =>
>> log.info(s"Received message: $text")
>> ByteString(text + "!!!\n")
>>   }
>>
>>   def handleIncomingConnection(connection: IncomingConnection) {
>> log.info(s"New client at ${connection.remoteAddress}")
>> val flowGraph = connection.flow.joinMat(echo)(Keep.both)
>> flowGraph.run()
>>   }
>>
>>   private val mat = connectionsGraph.run()
>>
>>
>>
>> And from the other side, the TLS part:
>>
>> ( I have a Function to take the SSLContext .. but is the only thing that 
>> I have ...)
>>
>> def initSslContext(): SSLContext = {
>>   val password = "password"
>>
>>   val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
>>   keyStore.load(getClass.getResourceAsStream("/keystore"), 
>> password.toCharArray)
>>
>>   val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
>>   trustStore.load(getClass.getResourceAsStream("/truststore"), 
>> password.toCharArray)
>>
>>   val keyManagerFactory = 
>> KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
>>   keyManagerFactory.init(keyStore, password.toCharArray)
>>
>>   val trustManagerFactory = 
>> TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
>>   trustManagerFactory.init(trustStore)
>>
>>   val context = SSLContext.getInstance("TLS")
>>   context.init(keyManagerFactory.getKeyManagers, 
>> trustManagerFactory.getTrustManagers, new SecureRandom)
>>
>>   for (protocol <- context.getSupportedSSLParameters.getProtocols) {
>> log.info(s"Supported protocol: $protocol")
>>   }
>>   context
>> }
>>
>>
>>
>> If anyone has an example, or something I would appreciate.
>>
>> Thank You!
>> Pablo
>>
>>
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams + TCP + TLS

2017-03-12 Thread Konrad Malawski
Here's a full example which does just that:
https://github.com/typesafehub/activator-akka-stream-java8/blob/master/src/main/java/sample/stream/TcpTLSEcho.java

Happy hakking


On Mar 11, 2017 10:43 AM, "Pablo Milanese" 
wrote:

> Hello !.
>
> I am actually looking for an example of the combinations of
> *akka.streams.javadsl.Tcp* and *akka.streams.javadsl.TLS* to create
> custom SSL/TLS enabled TCP servers and clients, but I couldn't find
> anything ..
>
> I think I have resolved the akka-streams-tcp, but I can't find how can I
> join the TLS API on the things that I have:
>
> Here's my code for TCP connections using akka streams:
>
>
>
>   private val connectionsSource: Source[IncomingConnection, 
> Future[ServerBinding]] =
> Tcp(context.system)
>   .bind(transportConfig.getHost, transportConfig.getPort)
>
>   private val connectionsSink: Sink[IncomingConnection, Future[Done]] =
> Sink.foreach(handleIncomingConnection)
>
>   private val connectionsGraph: RunnableGraph[(Future[ServerBinding], 
> Future[Done])] =
> connectionsSource.toMat(connectionsSink)(Keep.both)
>
>   private val echo: Flow[ByteString, ByteString, NotUsed] =
>
> Flow[ByteString]
>   .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, 
> allowTruncation = true))
>   .map(_.utf8String)
>   .map { text =>
> log.info(s"Received message: $text")
> ByteString(text + "!!!\n")
>   }
>
>   def handleIncomingConnection(connection: IncomingConnection) {
> log.info(s"New client at ${connection.remoteAddress}")
> val flowGraph = connection.flow.joinMat(echo)(Keep.both)
> flowGraph.run()
>   }
>
>   private val mat = connectionsGraph.run()
>
>
>
> And from the other side, the TLS part:
>
> ( I have a Function to take the SSLContext .. but is the only thing that I
> have ...)
>
> def initSslContext(): SSLContext = {
>   val password = "password"
>
>   val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
>   keyStore.load(getClass.getResourceAsStream("/keystore"), 
> password.toCharArray)
>
>   val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
>   trustStore.load(getClass.getResourceAsStream("/truststore"), 
> password.toCharArray)
>
>   val keyManagerFactory = 
> KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
>   keyManagerFactory.init(keyStore, password.toCharArray)
>
>   val trustManagerFactory = 
> TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
>   trustManagerFactory.init(trustStore)
>
>   val context = SSLContext.getInstance("TLS")
>   context.init(keyManagerFactory.getKeyManagers, 
> trustManagerFactory.getTrustManagers, new SecureRandom)
>
>   for (protocol <- context.getSupportedSSLParameters.getProtocols) {
> log.info(s"Supported protocol: $protocol")
>   }
>   context
> }
>
>
>
> If anyone has an example, or something I would appreciate.
>
> Thank You!
> Pablo
>
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams + TCP + TLS

2017-03-11 Thread Pablo Milanese
Hello !.

I am actually looking for an example of the combinations of 
*akka.streams.javadsl.Tcp* and *akka.streams.javadsl.TLS* to create custom 
SSL/TLS enabled TCP servers and clients, but I couldn't find anything .. 

I think I have resolved the akka-streams-tcp, but I can't find how can I 
join the TLS API on the things that I have:

Here's my code for TCP connections using akka streams:



  private val connectionsSource: Source[IncomingConnection, 
Future[ServerBinding]] =
Tcp(context.system)
  .bind(transportConfig.getHost, transportConfig.getPort)

  private val connectionsSink: Sink[IncomingConnection, Future[Done]] =
Sink.foreach(handleIncomingConnection)

  private val connectionsGraph: RunnableGraph[(Future[ServerBinding], 
Future[Done])] =
connectionsSource.toMat(connectionsSink)(Keep.both)

  private val echo: Flow[ByteString, ByteString, NotUsed] =

Flow[ByteString]
  .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, 
allowTruncation = true))
  .map(_.utf8String)
  .map { text =>
log.info(s"Received message: $text")
ByteString(text + "!!!\n")
  }

  def handleIncomingConnection(connection: IncomingConnection) {
log.info(s"New client at ${connection.remoteAddress}")
val flowGraph = connection.flow.joinMat(echo)(Keep.both)
flowGraph.run()
  }

  private val mat = connectionsGraph.run()



And from the other side, the TLS part:

( I have a Function to take the SSLContext .. but is the only thing that I 
have ...)

def initSslContext(): SSLContext = {
  val password = "password"

  val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
  keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray)

  val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
  trustStore.load(getClass.getResourceAsStream("/truststore"), 
password.toCharArray)

  val keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
  keyManagerFactory.init(keyStore, password.toCharArray)

  val trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
  trustManagerFactory.init(trustStore)

  val context = SSLContext.getInstance("TLS")
  context.init(keyManagerFactory.getKeyManagers, 
trustManagerFactory.getTrustManagers, new SecureRandom)

  for (protocol <- context.getSupportedSSLParameters.getProtocols) {
log.info(s"Supported protocol: $protocol")
  }
  context
}



If anyone has an example, or something I would appreciate.

Thank You!
Pablo



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka streams] - filter with mapAsync strange behavior

2017-03-06 Thread Alexandre Delegue
Hi, 

Executing this code 

Source.range(1, 4)
.mapAsync(1, e -> CompletableFuture.supplyAsync(() -> {
System.out.println("Num "+e);
return e;
}))
.filter(any -> true)
.take(1)
.runWith(Sink.foreach(e -> { System.out.println("End "+e); }), mat);


Will print 
Num 1
End 1
Num 2

I don't understand why the async call in mapAsync is executed for number 2 
while the stream should be completed. 

This work fine 

Source.range(1, 4)
.mapAsync(1, e -> CompletableFuture.supplyAsync(() -> {
System.out.println("Num "+e);
return e;
}))
.take(1)
.runWith(Sink.foreach(e -> { System.out.println("End "+e); }), mat);


So it seems that the filter method pull next element even if the stream is 
completed. 

Any idea? Is there a way to avoid that ? 

Thanks, 
Alexandre. 


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka Streams] How can i use java.sql.ResultSet as akka streams source

2017-03-06 Thread DEEPAK GUPTA
[Akka Streams] How can i use java.sql.ResultSet as akka streams source

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams - Retry failed RPC call.

2017-02-24 Thread Viktor Klang
Hi Sean!

There is a mistake and a misunderstanding in your code:


object Test extends App {

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()


  val x: Future[Done] = Source(List("one", "two", "three")).map { value =>
val data = makeRPCcall(value)
data.recoverWithRetries(3 , {
  case e: Exception => makeRPCcall(value)
})
  }.flatMapConcat(identity).runForeach(println(_))



  var i = 0 //<--- NEVER DO THIS, you're closing over a mutable variable
without proper synchronization
  def makeRPCcall(key: String) = {
i = i + 1
if (i == 3) {
  Source.failed(new RuntimeException("failed in source")) //<--- You
need to return a Source which fails, not directly throw, otherwise your
*recovery* fails, not the source with which you are recovering.
} else {
  Source.single(i.toString)
}
  }



  Await.result(x, 10.minutes)
}



So something like this should demonstrate the solution:

object Test {
  val sys = ActorSystem("Test")
  implicit val mat = ActorMaterializer()(sys)

  def makeRPCcall(key: String): Source[Int, NotUsed] = {
val rnd = java.util.concurrent.ThreadLocalRandom.current.nextInt(100)
if (rnd <= 10) Source.failed(new RuntimeException(s"Tried $key but
can't handle the truth: $rnd"))
else Source.single(key.toInt)
  }

  def x: Future[Done] = Source((1 to 1).map(_.toString)).flatMapConcat
{ value =>
makeRPCcall(value).recoverWithRetries(3 , {
  case e: RuntimeException =>
println(e.getMessage) // <--- so we can see the sample output below
makeRPCcall(value)
})
  }.runForeach(println)
}

Test.x

Example output:

scala> Tried 1 but can't handle the truth: 7
1
2
3
4
5
6
7
8
9
10
11
12
Tried 13 but can't handle the truth: 10
13
14
15
16
Tried 17 but can't handle the truth: 8
17
18
19
20
21
22
23
24
25
26
27
28
29
Tried 30 but can't handle the truth: 9
30
31
32
33
34
Tried 35 but can't handle the truth: 9

Test.sys.terminate()

On Thu, Feb 23, 2017 at 9:00 PM, Sean Callahan 
wrote:

> I am skeptical of this working. The reason being is the following example.
> I would expect my printed output to be "1" "2" "4". But instead I get "1"
> "2" "Exception in thread "main" java.lang.RuntimeException: failed in
> source". So it seems that I am missing something here.
>
> object Test extends App {
>
>   implicit val system = ActorSystem()
>   implicit val mat = ActorMaterializer()
>
>
>   val x: Future[Done] = Source(List("one", "two", "three")).map { value =>
> val data = makeRPCcall(value)
> data.recoverWithRetries(3 , {
>   case e: Exception => makeRPCcall(value)
> })
>   }.flatMapConcat(identity).runForeach(println(_))
>
>
>
>   var i = 0
>   def makeRPCcall(key: String) = {
> i = i + 1
> if (i == 3) {
>   throw new RuntimeException("failed in source")
> } else {
>   Source.single(i.toString)
> }
>   }
>
>
>
>   Await.result(x, 10.minutes)
> }
>
>
>
>
> On Thursday, February 23, 2017 at 11:24:05 AM UTC-7, √ wrote:
>>
>> def downloadS3ObjectFlow(s3Client: S3Client, bucket: String):
>> Flow[String, ByteString, NotUsed] = {
>> Flow[String].flatMapConcat { key =>
>>   log.debug(s"Downloading $key")
>>   val data: Source[ByteString, NotUsed] = s3Client.download(bucket,
>> key)
>>   data.recoverWithRetries(3, {
>> case SomeExpectedException => data
>>   })
>> }
>>   }
>>
>> On Thu, Feb 23, 2017 at 5:20 PM, Sean Callahan 
>> wrote:
>>
>>> Hey all, Currently I have the following flow to take in an S3 key and
>>> download that that file from S3.
>>>
>>> def downloadS3ObjectFlow(s3Client: S3Client, bucket: String):
>>> Flow[String, ByteString, NotUsed] = {
>>> Flow[String].map { key =>
>>>   log.debug(s"Downloading $key")
>>>   val data: Source[ByteString, NotUsed] = s3Client.download(bucket,
>>> key)
>>>   data
>>> }.flatMapConcat(identity)
>>>   }
>>>
>>>
>>>
>>> In 99% of cases, this works great, but as many of us know, S3 fails a
>>> fair amount for random unknown reasons the proper way to resolve it is to
>>> just retry the request. After a few hours of googling and trying various
>>> different solutions out there I have yet to find the proper solution here.
>>> Is there anything built into Akka that can accomplish this, or should I
>>> write some custom logic here to check the value of "data" and setup some
>>> type of stateful retries?
>>>
>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/

Re: [akka-user] Akka Streams - Retry failed RPC call.

2017-02-23 Thread Sean Callahan
I am skeptical of this working. The reason being is the following example. 
I would expect my printed output to be "1" "2" "4". But instead I get "1" 
"2" "Exception in thread "main" java.lang.RuntimeException: failed in 
source". So it seems that I am missing something here. 

object Test extends App {

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()


  val x: Future[Done] = Source(List("one", "two", "three")).map { value =>
val data = makeRPCcall(value)
data.recoverWithRetries(3 , {
  case e: Exception => makeRPCcall(value)
})
  }.flatMapConcat(identity).runForeach(println(_))



  var i = 0
  def makeRPCcall(key: String) = {
i = i + 1
if (i == 3) {
  throw new RuntimeException("failed in source")
} else {
  Source.single(i.toString)
}
  }



  Await.result(x, 10.minutes)
}


 

On Thursday, February 23, 2017 at 11:24:05 AM UTC-7, √ wrote:
>
> def downloadS3ObjectFlow(s3Client: S3Client, bucket: String): Flow[String, 
> ByteString, NotUsed] = {
> Flow[String].flatMapConcat { key =>
>   log.debug(s"Downloading $key")
>   val data: Source[ByteString, NotUsed] = s3Client.download(bucket, 
> key)
>   data.recoverWithRetries(3, {
> case SomeExpectedException => data
>   })
> }
>   }
>
> On Thu, Feb 23, 2017 at 5:20 PM, Sean Callahan  > wrote:
>
>> Hey all, Currently I have the following flow to take in an S3 key and 
>> download that that file from S3. 
>>
>> def downloadS3ObjectFlow(s3Client: S3Client, bucket: String): 
>> Flow[String, ByteString, NotUsed] = {
>> Flow[String].map { key =>
>>   log.debug(s"Downloading $key")
>>   val data: Source[ByteString, NotUsed] = s3Client.download(bucket, 
>> key)
>>   data
>> }.flatMapConcat(identity)
>>   }
>>
>>
>>
>> In 99% of cases, this works great, but as many of us know, S3 fails a 
>> fair amount for random unknown reasons the proper way to resolve it is to 
>> just retry the request. After a few hours of googling and trying various 
>> different solutions out there I have yet to find the proper solution here. 
>> Is there anything built into Akka that can accomplish this, or should I 
>> write some custom logic here to check the value of "data" and setup some 
>> type of stateful retries?
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams - Retry failed RPC call.

2017-02-23 Thread Viktor Klang
def downloadS3ObjectFlow(s3Client: S3Client, bucket: String): Flow[String,
ByteString, NotUsed] = {
Flow[String].flatMapConcat { key =>
  log.debug(s"Downloading $key")
  val data: Source[ByteString, NotUsed] = s3Client.download(bucket, key)
  data.recoverWithRetries(3, {
case SomeExpectedException => data
  })
}
  }

On Thu, Feb 23, 2017 at 5:20 PM, Sean Callahan 
wrote:

> Hey all, Currently I have the following flow to take in an S3 key and
> download that that file from S3.
>
> def downloadS3ObjectFlow(s3Client: S3Client, bucket: String): Flow[String,
> ByteString, NotUsed] = {
> Flow[String].map { key =>
>   log.debug(s"Downloading $key")
>   val data: Source[ByteString, NotUsed] = s3Client.download(bucket,
> key)
>   data
> }.flatMapConcat(identity)
>   }
>
>
>
> In 99% of cases, this works great, but as many of us know, S3 fails a fair
> amount for random unknown reasons the proper way to resolve it is to just
> retry the request. After a few hours of googling and trying various
> different solutions out there I have yet to find the proper solution here.
> Is there anything built into Akka that can accomplish this, or should I
> write some custom logic here to check the value of "data" and setup some
> type of stateful retries?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams - Retry failed RPC call.

2017-02-23 Thread Sean Callahan
Hey all, Currently I have the following flow to take in an S3 key and 
download that that file from S3. 

def downloadS3ObjectFlow(s3Client: S3Client, bucket: String): Flow[String, 
ByteString, NotUsed] = {
Flow[String].map { key =>
  log.debug(s"Downloading $key")
  val data: Source[ByteString, NotUsed] = s3Client.download(bucket, key)
  data
}.flatMapConcat(identity)
  }



In 99% of cases, this works great, but as many of us know, S3 fails a fair 
amount for random unknown reasons the proper way to resolve it is to just 
retry the request. After a few hours of googling and trying various 
different solutions out there I have yet to find the proper solution here. 
Is there anything built into Akka that can accomplish this, or should I 
write some custom logic here to check the value of "data" and setup some 
type of stateful retries?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka-streams: emitMultiple limitations?

2017-01-23 Thread Julian Howarth
Hi,

Firstly, apologies in advance for the somewhat vague nature of this 
question - I am trying to come up with a simplified reproducer for the 
problem but so far have not succeeded. What I'm after at the moment is any 
sort of clue that can help me pin down the actual issue.

The setup is as follows - we have websocket clients connecting to our 
streaming data service. Each websocket connection is serviced by a custom 
GraphStage which authenticates the client, sends them a bulk catchup 
message for the topic they are interested in (up to but generally less than 
500 500byte messages), then streams the live data until it completes. The 
code below should give a rough idea, though in the real system, the timer 
stage is replaced by a separate inlet and we have async callbacks for the 
auth part.

class WebSocketStage(initialData: Seq[MyData]) extends 
GraphStage[FlowShape[String, String]] {

  private val websocketIn: Inlet[String] = Inlet("WebsocketIn")
  private val websocketOut: Outlet[String] = Outlet("WebsocketOut")

  override val shape = FlowShape(websocketIn, websocketOut)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {

  var dumped = false
  var id = 0

  override def onTimer(timerKey: Any): Unit = {
emit(websocketOut, MyData(id + 1).toString, () ⇒ id += 1)
  }

  override def preStart(): Unit = {
println("Got new connection")
  }

  override def postStop(): Unit = {
println("Connection closed")
  }

  override def onPush(): Unit = {
println("Got push")
  }

  override def onPull(): Unit = {
println("Got pull")
if (!dumped) {
  emitMultiple(websocketOut, initialData.map(_.toString), () ⇒ {
import scala.concurrent.duration._
schedulePeriodically(1, 5.seconds)
dumped = true
id = initialData.size
  })
}
  }

  override def onUpstreamFinish(): Unit = {
super.onUpstreamFinish()
println("WS in connection closed")
  }

  override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
println("WS in failed")
  }

  override def onDownstreamFinish(): Unit = {
super.onDownstreamFinish()
println("WS out connection closed")
  }

  setHandlers(websocketIn, websocketOut, this)
}
}


This all works perfectly as long as the number of clients is relatively low 
(< 400). At some point between roughly 400 - 1000 connections, we start to 
see issues where for each new connection the initial set of bulk data is 
only partially sent and we receive no further output from the stage. 
However, we don't see any logging messages recording stage completion, nor 
any akka error messages. More interesting is that at this point connections 
can be made to topics where there is little or no data to be caught up 
(less than 10 messages) and the rest of the data is streamed out as normal. 
Also, previously established connections are unaffected and also stream to 
completion.

My initial thoughts were that somehow emitMultiple was hitting some 
internal limit, but in my initial attempt at a reproducer using the above 
code, I got up to 7000 connections without issue. Are there any obvious 
places I should be looking at for why we're hitting issues? Meanwhile, I'll 
try extending the above code such that I can reproduce the issue.

Thanks,

Julian



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-streams pattern for async filter

2017-01-13 Thread Patrik Nordwall
mapAsync.mapConcat

with Future[Option]

/Patrik
fre 13 jan. 2017 kl. 23:44 skrev jdenizac via Akka User List <
akka-user@googlegroups.com>:

> Hi! I'm somewhat new to akka-streams and trying to find the best way to
> express this flow:
>
> Let's say I have a stream of messages of type M for
>
> case class M(id: Int, val: String)
>
> and I have a source and some sink:
>
> MSource
>   .to(Sink.ignore)
>   .run()
>
> Now, I want to filter some of those messages. Let's say, throw away any
> with an odd id:
>
> def keep(m: M): Boolean = m.id % 2 == 0
>
> MSource
>   .filter(keep)
>   .to(Sink.ignore)
>   .run()
>
> But I run into trouble when I want to filter on an async predicate. I have
> mapAsync, and I saw some discussion in
> https://github.com/akka/akka/issues/18603 about flowAsync
>
> This is the best I can come up with:
>
> def keepAsync(m: M): Future[Boolean] = Future.successful(m.id % 2 == 0)
> // or whatever
>
> MSource
>   .mapAsync(1)(m => keepAsync(m).map(result => (m, result)))
>   .filter(x => x._2)
>   .map(x => x._1)
>   .to(Sink.ignore)
>   .run()
>
> This seems overly verbose and error-prone. Is there a better pattern? Is
> there a possibility of adding a filterAsync method to the combinator dsl? I
> found myself intuitively wanting to write, e.g.
>
> MSource
>   .filterAsync(1)(keepAsync(m))
>   .to(Sink.ignore)
>   .run()
>
>
> Thanks for your replies!
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka-streams pattern for async filter

2017-01-13 Thread jdenizac via Akka User List
Hi! I'm somewhat new to akka-streams and trying to find the best way to 
express this flow:

Let's say I have a stream of messages of type M for

case class M(id: Int, val: String)

and I have a source and some sink:

MSource
  .to(Sink.ignore)
  .run()

Now, I want to filter some of those messages. Let's say, throw away any 
with an odd id:

def keep(m: M): Boolean = m.id % 2 == 0

MSource
  .filter(keep)
  .to(Sink.ignore)
  .run()

But I run into trouble when I want to filter on an async predicate. I have 
mapAsync, and I saw some discussion 
in https://github.com/akka/akka/issues/18603 about flowAsync

This is the best I can come up with:

def keepAsync(m: M): Future[Boolean] = Future.successful(m.id % 2 == 0) // 
or whatever

MSource
  .mapAsync(1)(m => keepAsync(m).map(result => (m, result)))
  .filter(x => x._2)
  .map(x => x._1)
  .to(Sink.ignore)
  .run()

This seems overly verbose and error-prone. Is there a better pattern? Is 
there a possibility of adding a filterAsync method to the combinator dsl? I 
found myself intuitively wanting to write, e.g.

MSource
  .filterAsync(1)(keepAsync(m))
  .to(Sink.ignore)
  .run()


Thanks for your replies!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2017-01-13 Thread Kyrylo Stokoz
Just for info:
I tried with akka 2.4.16 + akka-http 10.0.2 and with akka 2.4.14.

In 2.4.14 i see truncation warnings for One2OneBidi stage logged, in 2.4.16 
i don`t have such logs, but issue is still reproducible.


On Friday, January 13, 2017 at 4:53:58 PM UTC+1, Kyrylo Stokoz wrote:
>
> I would like to second this issue.
>
> I`m experiencing similar behavior in my use case:
>
> I`m trying to download file from s3 and sometimes http entity length does 
> not match contentLength header and subsequent json parsing failing.
> The code i have (simplified):
>
> final def responseEntityAsString(entity: ResponseEntity)(implicit 
> executionContext: ExecutionContext, materializer: Materializer): 
> Future[String] =
>   entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _))
> .map(_.utf8String)
>
> Http().singleRequest(HttpRequest(uri = uri))
> .flatMap { response =>
>   responseEntityAsString(response.entity)
> .map { data =>
>   if (!(data.endsWith("}") || data.endsWith("]")))
> println(s"Got data [${data.length}] 
> contentLength:[${response.entity.contentLengthOption}] response: 
> [${response.headers.mkString(",")}, $response]")
> }
>
>
> Got data [2088541] contentLength:[Some(2147123)] response: [x-amz-id-2: 
> ImgIcWeKCF1Il45P9ugP6y8GPCwkb8BaoohGcLkXK1hI/7KrobtJmvbqMAxfI1QA6uAHKSW7k3c=,x-amz-request-id:
>  
> C4629FE2B03865B2,Date: Fri, 13 Jan 2017 15:48:43 GMT,Last-Modified: Tue, 10 
> Jan 2017 10:53:04 GMT,ETag: 
> "575b4d61d5f2be06d062b80d2f16b9e2",x-amz-version-id: 
> qOmZXXwIOmTA9cdBPN79xRX045QVMQt0,Accept-Ranges: bytes,Server: AmazonS3, 
> HttpResponse(200 OK,List(x-amz-id-2: 
> ImgIcWeKCF1Il45P9ugP6y8GPCwkb8BaoohGcLkXK1hI/7KrobtJmvbqMAxfI1QA6uAHKSW7k3c=, 
> x-amz-request-id: C4629FE2B03865B2, Date: Fri, 13 Jan 2017 15:48:43 GMT, 
> Last-Modified: Tue, 10 Jan 2017 10:53:04 GMT, ETag: 
> "575b4d61d5f2be06d062b80d2f16b9e2", x-amz-version-id: 
> qOmZXXwIOmTA9cdBPN79xRX045QVMQt0, Accept-Ranges: bytes, Server: 
> AmazonS3),HttpEntity.Default(application/json,2147123 bytes 
> total),HttpProtocol(HTTP/1.1))]
>
> On Saturday, December 17, 2016 at 1:08:25 AM UTC+1, sub...@gmail.com 
> wrote:
>>
>> Not exactly sure what you were looking for. I'm relatively new to akka 
>> streams. This is the minimal amount of code that reproduces the problem, 
>> involving:
>>
>> a custom source
>> an flow with AsyncCallback
>> and parallelizing
>>
>> only deps are akka and joda time
>>
>> On Friday, December 16, 2016 at 3:31:37 PM UTC-7, √ wrote:
>>>
>>> Is this really a *minimized* reproducer?
>>>
>>> On Fri, Dec 16, 2016 at 11:09 PM,  wrote:
>>>
 Here's the code to reproduce. The issue only seems to occur with my 
 custom Source and using a callback in a Flow and while parallelizing. If 
 this code is run with  .via(parallelizeFlow(parallelize = 1, 
 asyncFlow)) it drops the last minute, but when run with .via(asyncFlow)
  it does not.

 So with parallelize it prints all but the last minute:

 seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
 2016/12/14/23/57, 2016/12/14/23/58)

 and without it prints the expected value, with all minutes flowing 
 through the graph:

 seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
 2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)





 package hack.streams

 import akka.NotUsed
 import akka.stream._
 import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
 import akka.stream.stage._
 import org.joda.time.{DateTime, Duration, Interval}

 import scala.collection.immutable.Seq
 import scala.concurrent.{Await, Future}

 object AsyncIssue {
   import StreamsMaterializer._

   def minuteSource(interval: Interval) = new 
 GraphStage[SourceShape[DateTime]]() {
 val out = Outlet[DateTime]("keys")
 val shape = SourceShape(out)

 def zeroToMinute(date: DateTime) = 
 date.withMillisOfSecond(0).withSecondOfMinute(0)

 override def createLogic(inheritedAttributes: Attributes): 
 GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
   var isDone: Boolean = false
   var current: DateTime = new DateTime(0)

   override def preStart() = {
 current = zeroToMinute(interval.getStart)
   }

   setHandler(out,
 new OutHandler {
   override def onPull(): Unit = {
 if (!isDone) {
   push(out, current)
   current = current.plusMinutes(1)

   if (current.isEqual(zeroToMinute(interval.getEnd))) {
 isDone = true
   }
 } else {
   complete(out)
 

Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2017-01-13 Thread Kyrylo Stokoz
I would like to second this issue.

I`m experiencing similar behavior in my use case:

I`m trying to download file from s3 and sometimes http entity length does 
not match contentLength header and subsequent json parsing failing.
The code i have (simplified):

final def responseEntityAsString(entity: ResponseEntity)(implicit 
executionContext: ExecutionContext, materializer: Materializer): Future[String] 
=
  entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _))
.map(_.utf8String)

Http().singleRequest(HttpRequest(uri = uri))
.flatMap { response =>
  responseEntityAsString(response.entity)
.map { data =>
  if (!(data.endsWith("}") || data.endsWith("]")))
println(s"Got data [${data.length}] 
contentLength:[${response.entity.contentLengthOption}] response: 
[${response.headers.mkString(",")}, $response]")
}


Got data [2088541] contentLength:[Some(2147123)] response: [x-amz-id-2: 
ImgIcWeKCF1Il45P9ugP6y8GPCwkb8BaoohGcLkXK1hI/7KrobtJmvbqMAxfI1QA6uAHKSW7k3c=,x-amz-request-id:
 
C4629FE2B03865B2,Date: Fri, 13 Jan 2017 15:48:43 GMT,Last-Modified: Tue, 10 
Jan 2017 10:53:04 GMT,ETag: 
"575b4d61d5f2be06d062b80d2f16b9e2",x-amz-version-id: 
qOmZXXwIOmTA9cdBPN79xRX045QVMQt0,Accept-Ranges: bytes,Server: AmazonS3, 
HttpResponse(200 OK,List(x-amz-id-2: 
ImgIcWeKCF1Il45P9ugP6y8GPCwkb8BaoohGcLkXK1hI/7KrobtJmvbqMAxfI1QA6uAHKSW7k3c=, 
x-amz-request-id: C4629FE2B03865B2, Date: Fri, 13 Jan 2017 15:48:43 GMT, 
Last-Modified: Tue, 10 Jan 2017 10:53:04 GMT, ETag: 
"575b4d61d5f2be06d062b80d2f16b9e2", x-amz-version-id: 
qOmZXXwIOmTA9cdBPN79xRX045QVMQt0, Accept-Ranges: bytes, Server: 
AmazonS3),HttpEntity.Default(application/json,2147123 bytes 
total),HttpProtocol(HTTP/1.1))]

On Saturday, December 17, 2016 at 1:08:25 AM UTC+1, sub...@gmail.com wrote:
>
> Not exactly sure what you were looking for. I'm relatively new to akka 
> streams. This is the minimal amount of code that reproduces the problem, 
> involving:
>
> a custom source
> an flow with AsyncCallback
> and parallelizing
>
> only deps are akka and joda time
>
> On Friday, December 16, 2016 at 3:31:37 PM UTC-7, √ wrote:
>>
>> Is this really a *minimized* reproducer?
>>
>> On Fri, Dec 16, 2016 at 11:09 PM,  wrote:
>>
>>> Here's the code to reproduce. The issue only seems to occur with my 
>>> custom Source and using a callback in a Flow and while parallelizing. If 
>>> this code is run with  .via(parallelizeFlow(parallelize = 1, 
>>> asyncFlow)) it drops the last minute, but when run with .via(asyncFlow) it 
>>> does not.
>>>
>>> So with parallelize it prints all but the last minute:
>>>
>>> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
>>> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
>>> 2016/12/14/23/57, 2016/12/14/23/58)
>>>
>>> and without it prints the expected value, with all minutes flowing 
>>> through the graph:
>>>
>>> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
>>> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
>>> 2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)
>>>
>>>
>>>
>>>
>>>
>>> package hack.streams
>>>
>>> import akka.NotUsed
>>> import akka.stream._
>>> import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
>>> import akka.stream.stage._
>>> import org.joda.time.{DateTime, Duration, Interval}
>>>
>>> import scala.collection.immutable.Seq
>>> import scala.concurrent.{Await, Future}
>>>
>>> object AsyncIssue {
>>>   import StreamsMaterializer._
>>>
>>>   def minuteSource(interval: Interval) = new 
>>> GraphStage[SourceShape[DateTime]]() {
>>> val out = Outlet[DateTime]("keys")
>>> val shape = SourceShape(out)
>>>
>>> def zeroToMinute(date: DateTime) = 
>>> date.withMillisOfSecond(0).withSecondOfMinute(0)
>>>
>>> override def createLogic(inheritedAttributes: Attributes): 
>>> GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
>>>   var isDone: Boolean = false
>>>   var current: DateTime = new DateTime(0)
>>>
>>>   override def preStart() = {
>>> current = zeroToMinute(interval.getStart)
>>>   }
>>>
>>>   setHandler(out,
>>> new OutHandler {
>>>   override def onPull(): Unit = {
>>> if (!isDone) {
>>>   push(out, current)
>>>   current = current.plusMinutes(1)
>>>
>>>   if (current.isEqual(zeroToMinute(interval.getEnd))) {
>>> isDone = true
>>>   }
>>> } else {
>>>   complete(out)
>>> }
>>>   }
>>> })
>>> }
>>>   }
>>>
>>>   def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
>>> val in = Inlet[DateTime]("minute")
>>> val out = Outlet[String]("string")
>>>
>>> val formatter = 
>>> org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
>>>
>>> val shape = FlowShape.of(in, out)
>>>
>>> override def createLogic(inheritedAttributes: Attributes): 
>>> 

[akka-user] [Akka-streams] Cannot push port twice

2017-01-09 Thread Sergey Sopin
Hi,

I have created FanOutShape2 shape with cusom logic: 

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes)  {
return new GraphStageLogic(shape) {
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
Object result = process(grab(in), materializer());

if (result instanceof ProcessingResponse) {   
ProcessingResponse response = 
(ProcessingResponse) result;
if (!isClosed(out1)) {
push(out1, response);   
//This is FAndLShape.java:46
}
} else if (result != null && result instanceof 
FinderData) {  
FinderData response = (FinderData) result;
if (!isClosed(out0)) {
push(out0, response);
}
}
}
});

setHandler(out0, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
if ((!hasBeenPulled(in)) && (!isClosed(in))) {
pull(in);
}
}
});

setHandler(out1, new AbstractOutHandler() {
@Override
public void onPull () throws Exception {
if ((!hasBeenPulled(in)) && (!isClosed(in))) {
pull(in);
}
}
});
}

};
}

And sometimes I get following error message: 

[error] a.a.RepointableActorRef - Error in stage 
[kernel.modeller.workers.streamFinder.finderShapes.FAndLShape@4efd96f7]: 
requirement failed: Cannot push port (out1) twice
java.lang.IllegalArgumentException: requirement failed: Cannot push port 
(out1) twice
at scala.Predef$.require(Predef.scala:219)
at akka.stream.stage.GraphStageLogic.push(GraphStage.scala:439)
at 
kernel.modeller.workers.streamFinder.finderShapes.FAndLShape$1$1.onPush(FAndLShape.java:46)
at 
akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:582)
at 
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:593)
at 
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)
at 
akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)
at 
akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:387)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:547)
at akka.actor.Actor$class.aroundReceive(Actor.scala:484)

Could you tell me what is wrong here? 
Thanks! 

Cheers,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Not exactly sure what you were looking for. I'm relatively new to akka 
streams. This is the minimal amount of code that reproduces the problem, 
involving:

a custom source
an flow with AsyncCallback
and parallelizing

only deps are akka and joda time

On Friday, December 16, 2016 at 3:31:37 PM UTC-7, √ wrote:
>
> Is this really a *minimized* reproducer?
>
> On Fri, Dec 16, 2016 at 11:09 PM, > wrote:
>
>> Here's the code to reproduce. The issue only seems to occur with my 
>> custom Source and using a callback in a Flow and while parallelizing. If 
>> this code is run with  .via(parallelizeFlow(parallelize = 1, asyncFlow)) it 
>> drops the last minute, but when run with .via(asyncFlow) it does not.
>>
>> So with parallelize it prints all but the last minute:
>>
>> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
>> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
>> 2016/12/14/23/57, 2016/12/14/23/58)
>>
>> and without it prints the expected value, with all minutes flowing 
>> through the graph:
>>
>> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
>> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
>> 2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)
>>
>>
>>
>>
>>
>> package hack.streams
>>
>> import akka.NotUsed
>> import akka.stream._
>> import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
>> import akka.stream.stage._
>> import org.joda.time.{DateTime, Duration, Interval}
>>
>> import scala.collection.immutable.Seq
>> import scala.concurrent.{Await, Future}
>>
>> object AsyncIssue {
>>   import StreamsMaterializer._
>>
>>   def minuteSource(interval: Interval) = new 
>> GraphStage[SourceShape[DateTime]]() {
>> val out = Outlet[DateTime]("keys")
>> val shape = SourceShape(out)
>>
>> def zeroToMinute(date: DateTime) = 
>> date.withMillisOfSecond(0).withSecondOfMinute(0)
>>
>> override def createLogic(inheritedAttributes: Attributes): 
>> GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
>>   var isDone: Boolean = false
>>   var current: DateTime = new DateTime(0)
>>
>>   override def preStart() = {
>> current = zeroToMinute(interval.getStart)
>>   }
>>
>>   setHandler(out,
>> new OutHandler {
>>   override def onPull(): Unit = {
>> if (!isDone) {
>>   push(out, current)
>>   current = current.plusMinutes(1)
>>
>>   if (current.isEqual(zeroToMinute(interval.getEnd))) {
>> isDone = true
>>   }
>> } else {
>>   complete(out)
>> }
>>   }
>> })
>> }
>>   }
>>
>>   def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
>> val in = Inlet[DateTime]("minute")
>> val out = Outlet[String]("string")
>>
>> val formatter = 
>> org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
>>
>> val shape = FlowShape.of(in, out)
>>
>> override def createLogic(inheritedAttributes: Attributes): 
>> GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
>>   val pushCallback = getAsyncCallback[String] { seq =>
>> push(out, seq)
>>   }
>>
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val minute = grab(in)
>>
>>   val fMin: Future[DateTime] = Future {minute}
>>
>>   fMin.foreach { min =>
>> pushCallback.invoke(formatter.print(min))
>>   }
>> }
>>   })
>>
>>   setHandler(out,
>> new OutHandler {
>>   override def onPull(): Unit = {
>> pull(in)
>>   }
>> }
>>   )
>> }
>>   }
>>
>>   def parallelizeFlow[In, Out](parallelize: Int, flow: 
>> Flow[In,Out,NotUsed]): Flow[In, Out, NotUsed] = 
>> Flow.fromGraph(GraphDSL.create() { implicit builder =>
>> import GraphDSL.Implicits._
>>
>> val dispatcher = builder.add(Balance[In](parallelize))
>> val merger = builder.add(Merge[Out](parallelize))
>>
>> for (i <- 0 to parallelize - 1) {
>>   dispatcher.out(i) ~> flow.async ~> merger.in(i)
>> }
>>
>> FlowShape(dispatcher.in, merger.out)
>>   })
>>
>>   def run() = {
>> import StreamsMaterializer._
>>
>> val asyncFlow = Flow[DateTime].via(futureCallbackFlow)
>>
>> val source: Source[DateTime, NotUsed] = Source(11 to 
>> 20).via(Flow[Int].map { num =>
>>   val formatter = 
>> org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
>>   formatter.parseDateTime(s"2016/12/14/14/$num")
>> })
>>
>> val mat: Future[Seq[String]] = Source.fromGraph(minuteSource(new 
>> Interval(Duration.standardMinutes(10), new 
>> DateTime().dayOfMonth().roundFloorCopy().minusDays(1
>>   .via(parallelizeFlow(parallelize = 1, asyncFlow))
>> //  .via(asyncFlow)
>>   .runWith(Sink.seq)
>>
>> val seq: Seq[String] = Await.result(mat, 
>> scala.conc

Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread Viktor Klang
Is this really a *minimized* reproducer?

On Fri, Dec 16, 2016 at 11:09 PM,  wrote:

> Here's the code to reproduce. The issue only seems to occur with my custom
> Source and using a callback in a Flow and while parallelizing. If this code
> is run with  .via(parallelizeFlow(parallelize = 1, asyncFlow)) it drops
> the last minute, but when run with .via(asyncFlow) it does not.
>
> So with parallelize it prints all but the last minute:
>
> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52,
> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56,
> 2016/12/14/23/57, 2016/12/14/23/58)
>
> and without it prints the expected value, with all minutes flowing through
> the graph:
>
> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52,
> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56,
> 2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)
>
>
>
>
>
> package hack.streams
>
> import akka.NotUsed
> import akka.stream._
> import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
> import akka.stream.stage._
> import org.joda.time.{DateTime, Duration, Interval}
>
> import scala.collection.immutable.Seq
> import scala.concurrent.{Await, Future}
>
> object AsyncIssue {
>   import StreamsMaterializer._
>
>   def minuteSource(interval: Interval) = new 
> GraphStage[SourceShape[DateTime]]() {
> val out = Outlet[DateTime]("keys")
> val shape = SourceShape(out)
>
> def zeroToMinute(date: DateTime) = 
> date.withMillisOfSecond(0).withSecondOfMinute(0)
>
> override def createLogic(inheritedAttributes: Attributes): 
> GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
>   var isDone: Boolean = false
>   var current: DateTime = new DateTime(0)
>
>   override def preStart() = {
> current = zeroToMinute(interval.getStart)
>   }
>
>   setHandler(out,
> new OutHandler {
>   override def onPull(): Unit = {
> if (!isDone) {
>   push(out, current)
>   current = current.plusMinutes(1)
>
>   if (current.isEqual(zeroToMinute(interval.getEnd))) {
> isDone = true
>   }
> } else {
>   complete(out)
> }
>   }
> })
> }
>   }
>
>   def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
> val in = Inlet[DateTime]("minute")
> val out = Outlet[String]("string")
>
> val formatter = 
> org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
>
> val shape = FlowShape.of(in, out)
>
> override def createLogic(inheritedAttributes: Attributes): 
> GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
>   val pushCallback = getAsyncCallback[String] { seq =>
> push(out, seq)
>   }
>
>   setHandler(in, new InHandler {
> override def onPush(): Unit = {
>   val minute = grab(in)
>
>   val fMin: Future[DateTime] = Future {minute}
>
>   fMin.foreach { min =>
> pushCallback.invoke(formatter.print(min))
>   }
> }
>   })
>
>   setHandler(out,
> new OutHandler {
>   override def onPull(): Unit = {
> pull(in)
>   }
> }
>   )
> }
>   }
>
>   def parallelizeFlow[In, Out](parallelize: Int, flow: Flow[In,Out,NotUsed]): 
> Flow[In, Out, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder 
> =>
> import GraphDSL.Implicits._
>
> val dispatcher = builder.add(Balance[In](parallelize))
> val merger = builder.add(Merge[Out](parallelize))
>
> for (i <- 0 to parallelize - 1) {
>   dispatcher.out(i) ~> flow.async ~> merger.in(i)
> }
>
> FlowShape(dispatcher.in, merger.out)
>   })
>
>   def run() = {
> import StreamsMaterializer._
>
> val asyncFlow = Flow[DateTime].via(futureCallbackFlow)
>
> val source: Source[DateTime, NotUsed] = Source(11 to 
> 20).via(Flow[Int].map { num =>
>   val formatter = 
> org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
>   formatter.parseDateTime(s"2016/12/14/14/$num")
> })
>
> val mat: Future[Seq[String]] = Source.fromGraph(minuteSource(new 
> Interval(Duration.standardMinutes(10), new 
> DateTime().dayOfMonth().roundFloorCopy().minusDays(1
>   .via(parallelizeFlow(parallelize = 1, asyncFlow))
> //  .via(asyncFlow)
>   .runWith(Sink.seq)
>
> val seq: Seq[String] = Await.result(mat, 
> scala.concurrent.duration.Duration.Inf)
> println(s"seq is $seq")
>   }
> }
>
>
>
>
> On Friday, December 16, 2016 at 8:03:45 AM UTC-7, √ wrote:
>>
>> Plase submit a miminized reproducer so readers have a chance of running
>> the code.
>>
>> On Fri, Dec 16, 2016 at 3:44 PM,  wrote:
>>
>>> Hi,
>>>
>>> I'm seeing an issue where the graph completes while there is still data
>>> in one of the flows. The last element emitted by the source enters a custom
>>> GraphStageLogic flow, 

Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Here's the code to reproduce. The issue only seems to occur with my custom 
Source and using a callback in a Flow and while parallelizing. If this code 
is run with  .via(parallelizeFlow(parallelize = 1, asyncFlow)) it drops the 
last minute, but when run with .via(asyncFlow) it does not.

So with parallelize it prints all but the last minute:

seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
2016/12/14/23/57, 2016/12/14/23/58)

and without it prints the expected value, with all minutes flowing through 
the graph:

seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)





package hack.streams

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.stage._
import org.joda.time.{DateTime, Duration, Interval}

import scala.collection.immutable.Seq
import scala.concurrent.{Await, Future}

object AsyncIssue {
  import StreamsMaterializer._

  def minuteSource(interval: Interval) = new 
GraphStage[SourceShape[DateTime]]() {
val out = Outlet[DateTime]("keys")
val shape = SourceShape(out)

def zeroToMinute(date: DateTime) = 
date.withMillisOfSecond(0).withSecondOfMinute(0)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
= new GraphStageLogic(shape) with StageLogging {
  var isDone: Boolean = false
  var current: DateTime = new DateTime(0)

  override def preStart() = {
current = zeroToMinute(interval.getStart)
  }

  setHandler(out,
new OutHandler {
  override def onPull(): Unit = {
if (!isDone) {
  push(out, current)
  current = current.plusMinutes(1)

  if (current.isEqual(zeroToMinute(interval.getEnd))) {
isDone = true
  }
} else {
  complete(out)
}
  }
})
}
  }

  def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
val in = Inlet[DateTime]("minute")
val out = Outlet[String]("string")

val formatter = 
org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")

val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
= new GraphStageLogic(shape) with StageLogging {
  val pushCallback = getAsyncCallback[String] { seq =>
push(out, seq)
  }

  setHandler(in, new InHandler {
override def onPush(): Unit = {
  val minute = grab(in)

  val fMin: Future[DateTime] = Future {minute}

  fMin.foreach { min =>
pushCallback.invoke(formatter.print(min))
  }
}
  })

  setHandler(out,
new OutHandler {
  override def onPull(): Unit = {
pull(in)
  }
}
  )
}
  }

  def parallelizeFlow[In, Out](parallelize: Int, flow: Flow[In,Out,NotUsed]): 
Flow[In, Out, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val dispatcher = builder.add(Balance[In](parallelize))
val merger = builder.add(Merge[Out](parallelize))

for (i <- 0 to parallelize - 1) {
  dispatcher.out(i) ~> flow.async ~> merger.in(i)
}

FlowShape(dispatcher.in, merger.out)
  })

  def run() = {
import StreamsMaterializer._

val asyncFlow = Flow[DateTime].via(futureCallbackFlow)

val source: Source[DateTime, NotUsed] = Source(11 to 20).via(Flow[Int].map 
{ num =>
  val formatter = 
org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
  formatter.parseDateTime(s"2016/12/14/14/$num")
})

val mat: Future[Seq[String]] = Source.fromGraph(minuteSource(new 
Interval(Duration.standardMinutes(10), new 
DateTime().dayOfMonth().roundFloorCopy().minusDays(1
  .via(parallelizeFlow(parallelize = 1, asyncFlow))
//  .via(asyncFlow)
  .runWith(Sink.seq)

val seq: Seq[String] = Await.result(mat, 
scala.concurrent.duration.Duration.Inf)
println(s"seq is $seq")
  }
}




On Friday, December 16, 2016 at 8:03:45 AM UTC-7, √ wrote:
>
> Plase submit a miminized reproducer so readers have a chance of running 
> the code.
>
> On Fri, Dec 16, 2016 at 3:44 PM, > wrote:
>
>> Hi,
>>
>> I'm seeing an issue where the graph completes while there is still data 
>> in one of the flows. The last element emitted by the source enters a custom 
>> GraphStageLogic flow, where it is sent to a function that returns a Future. 
>> That Future has a callback which invokes getAsyncCallback and then 
>> push(out). For the last element, the Future callback fires (pushCallback
>> .invoke(xml)) but the AsyncCallback is never invoked and the graph stops.
>>
>> For more context, this is what I have going on inside the GraphSt

Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread Andrew
ugh, my initial simplified flow does not reproduce the same behavior, so
I'll need to dig in more on that. To clarify, the println statements were
more of a debug aid; I'm using a Sink.seq and I don't see those values in
the resulting Seq.

On Fri, Dec 16, 2016 at 8:12 AM Justin du coeur  wrote:

> On Fri, Dec 16, 2016 at 10:05 AM, Konrad Malawski <
> konrad.malaw...@lightbend.com> wrote:
>
> My crystal ball tells me you may be observing timing artifacts which
> originate from the fact that you println one thing, but log the other.
> So there will be a timing difference when one or the other actually hits
> System.out.
>
>
> Ah, good point.  In my experience, this is frequently a source of
> confusion -- I've wound up trying to be very rigorous about using a single
> logging mechanism, after too many false leads caused by this problem...
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/akka-user/fBkWg4gSwEI/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread Konrad Malawski
My crystal ball tells me you may be observing timing artifacts which
originate from the fact that you println one thing, but log the other.
So there will be a timing difference when one or the other actually hits
System.out.

Agree with Viktor though, reproducers please for such cases :)


-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 16 December 2016 at 16:03:44, Viktor Klang (viktor.kl...@gmail.com)
wrote:

Plase submit a miminized reproducer so readers have a chance of running the
code.

On Fri, Dec 16, 2016 at 3:44 PM,  wrote:

> Hi,
>
> I'm seeing an issue where the graph completes while there is still data in
> one of the flows. The last element emitted by the source enters a custom
> GraphStageLogic flow, where it is sent to a function that returns a Future.
> That Future has a callback which invokes getAsyncCallback and then
> push(out). For the last element, the Future callback fires (pushCallback
> .invoke(xml)) but the AsyncCallback is never invoked and the graph stops.
>
> For more context, this is what I have going on inside the GraphStageLogic:
>
> val s3ListBucket: Source[ByteString, NotUsed] =
>   s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)
>
> val bucketListingXml: Future[String] = s3ListBucket
>   .map(_.utf8String)
>   .runWith(Sink.seq)(materializer)
>   .map(_.mkString)(materializer.executionContext)
>
>
> bucketListingXml.foreach {
> xml =>
>
>   println(s"This gets called. prefix $currentPrefix")
>
>   pushCallback.invoke(xml)
>   }(materializer.executionContext)
>
>
> And the callback
>
>
> val pushCallback = getAsyncCallback[String] { xml =>
>   log.info(s"This is never called for last element in graph!")
>   push(out, xml)
> }
>
>
> I don't see any errors and this issue consistently occurs on the last
> element. Thanks
>
> Andrew
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



--
Cheers,
√
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread Justin du coeur
On Fri, Dec 16, 2016 at 10:05 AM, Konrad Malawski <
konrad.malaw...@lightbend.com> wrote:

> My crystal ball tells me you may be observing timing artifacts which
> originate from the fact that you println one thing, but log the other.
> So there will be a timing difference when one or the other actually hits
> System.out.
>

Ah, good point.  In my experience, this is frequently a source of confusion
-- I've wound up trying to be very rigorous about using a single logging
mechanism, after too many false leads caused by this problem...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread Viktor Klang
Mine [crystal ball] has been in the repair shop the past 5 years and
there's always excuse after excuse why they can't fix it. :(

On Fri, Dec 16, 2016 at 4:05 PM, Konrad Malawski <
konrad.malaw...@lightbend.com> wrote:

> My crystal ball tells me you may be observing timing artifacts which
> originate from the fact that you println one thing, but log the other.
> So there will be a timing difference when one or the other actually hits
> System.out.
>
> Agree with Viktor though, reproducers please for such cases :)
>
>
> --
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 16 December 2016 at 16:03:44, Viktor Klang (viktor.kl...@gmail.com)
> wrote:
>
> Plase submit a miminized reproducer so readers have a chance of running
> the code.
>
> On Fri, Dec 16, 2016 at 3:44 PM,  wrote:
>
>> Hi,
>>
>> I'm seeing an issue where the graph completes while there is still data
>> in one of the flows. The last element emitted by the source enters a custom
>> GraphStageLogic flow, where it is sent to a function that returns a Future.
>> That Future has a callback which invokes getAsyncCallback and then
>> push(out). For the last element, the Future callback fires (pushCallback
>> .invoke(xml)) but the AsyncCallback is never invoked and the graph stops.
>>
>> For more context, this is what I have going on inside the GraphStageLogic:
>>
>> val s3ListBucket: Source[ByteString, NotUsed] =
>>   s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)
>>
>> val bucketListingXml: Future[String] = s3ListBucket
>>   .map(_.utf8String)
>>   .runWith(Sink.seq)(materializer)
>>   .map(_.mkString)(materializer.executionContext)
>>
>>
>> bucketListingXml.foreach {
>> xml =>
>>
>>   println(s"This gets called. prefix $currentPrefix")
>>
>>   pushCallback.invoke(xml)
>>   }(materializer.executionContext)
>>
>>
>> And the callback
>>
>>
>> val pushCallback = getAsyncCallback[String] { xml =>
>>   log.info(s"This is never called for last element in graph!")
>>   push(out, xml)
>> }
>>
>>
>> I don't see any errors and this issue consistently occurs on the last
>> element. Thanks
>>
>> Andrew
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Cheers,
> √
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>


-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread Viktor Klang
Plase submit a miminized reproducer so readers have a chance of running the
code.

On Fri, Dec 16, 2016 at 3:44 PM,  wrote:

> Hi,
>
> I'm seeing an issue where the graph completes while there is still data in
> one of the flows. The last element emitted by the source enters a custom
> GraphStageLogic flow, where it is sent to a function that returns a Future.
> That Future has a callback which invokes getAsyncCallback and then
> push(out). For the last element, the Future callback fires (pushCallback
> .invoke(xml)) but the AsyncCallback is never invoked and the graph stops.
>
> For more context, this is what I have going on inside the GraphStageLogic:
>
> val s3ListBucket: Source[ByteString, NotUsed] =
>   s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)
>
> val bucketListingXml: Future[String] = s3ListBucket
>   .map(_.utf8String)
>   .runWith(Sink.seq)(materializer)
>   .map(_.mkString)(materializer.executionContext)
>
>
>   bucketListingXml.foreach {
> xml =>
>
>   println(s"This gets called. prefix $currentPrefix")
>
> pushCallback.invoke(xml)
> }(materializer.executionContext)
>
>
> And the callback
>
>
> val pushCallback = getAsyncCallback[String] { xml =>
>   log.info(s"This is never called for last element in graph!")
>   push(out, xml)
> }
>
>
> I don't see any errors and this issue consistently occurs on the last
> element. Thanks
>
> Andrew
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Hi,

I'm seeing an issue where the graph completes while there is still data in 
one of the flows. The last element emitted by the source enters a custom 
GraphStageLogic flow, where it is sent to a function that returns a Future. 
That Future has a callback which invokes getAsyncCallback and then 
push(out). For the last element, the Future callback fires (pushCallback
.invoke(xml)) but the AsyncCallback is never invoked and the graph stops.

For more context, this is what I have going on inside the GraphStageLogic:

val s3ListBucket: Source[ByteString, NotUsed] =
  s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)

val bucketListingXml: Future[String] = s3ListBucket
  .map(_.utf8String)
  .runWith(Sink.seq)(materializer)
  .map(_.mkString)(materializer.executionContext)


  bucketListingXml.foreach {
xml =>

  println(s"This gets called. prefix $currentPrefix")

pushCallback.invoke(xml)
}(materializer.executionContext)


And the callback


val pushCallback = getAsyncCallback[String] { xml =>
  log.info(s"This is never called for last element in graph!")
  push(out, xml)
}


I don't see any errors and this issue consistently occurs on the last 
element. Thanks

Andrew

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka streams - tripped up by buffers

2016-11-28 Thread Viktor Klang
Sorry for the late response,

sounds like you found a solution to your problem!

On Wed, Nov 23, 2016 at 12:34 PM, Julian Howarth <10.howa...@gmail.com>
wrote:

> Is there any value in this approach for anyone else? Quite happy to raise
> issues/PR if there is.
>
> Thanks,
>
> Julian
>
>
> On Thursday, November 17, 2016 at 5:53:37 PM UTC, Julian Howarth wrote:
>>
>> Viktor,
>>
>> Here's what I came up with: https://gist.github.com/julian
>> howarth/7287a6e6eaf665dd79307aaff6164cd8
>>
>> Performance-wise, with some very casual testing I can't detect much, if
>> any, overhead over the original. Checked with both the vanilla case where
>> each inbound messaga corresponds to one frame, and also where the frames
>> are randomly split across the inbound packets. However, I assume you have a
>> more comprehensive set of tests which could be run.
>>
>> Julian
>>
>>
>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Implementing custom graph stage logic for FanOut shapes

2016-11-27 Thread Sergey Sopin
Hi,

I am trying to implement custom logic for UniformFanOutShape, but it seems 
I do something wrong.

public class SPMessageSplitter extends 
GraphStage> {

//Inlet
public final Inlet in = Inlet.create("Inlet.in");

//Outlets
public final Outlet flOut = Outlet.create("FL.out");
public final Outlet sOut = Outlet.create("SD.out");
public final Outlet cOut = Outlet.create("CD.out");

private Outlet[] outlets = {flOut, sOut, cOut};

//Shape
private final UniformFanOutShape shape = new 
UniformFanOutShape<>(in, (Outlet[])outlets);

@Override
public UniformFanOutShape shape() {
return shape;
}

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
IntReplacement pendingCount = new IntReplacement(3);
IntReplacement downstreamRunning = new IntReplacement(3);

BoolReplacement pending0 = BoolReplacement.TRUE;
BoolReplacement pending1 = BoolReplacement.TRUE;
BoolReplacement pending2 = BoolReplacement.TRUE;

//In handler
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
SPFinderData elem = grab(in);
System.out.print("Splitter: ON PUSH!");
push(findOutlet(elem), elem);
pendingCount.setInteger(downstreamRunning.getInteger());
}
});
}

//Handler for FL.out outlet
{
setHandler(flOut, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
{
pendingCount.setInteger(pendingCount.getInteger() - 
1);
pending0.setValue(false);
if (pendingCount.getInteger() == 0) pull(in);
}
}

@Override
public void onDownstreamFinish() {

downstreamRunning.setInteger(downstreamRunning.getInteger() - 1);
if (downstreamRunning.getInteger() == 0) 
completeStage();
else {
if (pending0.getBoolValue()) 
pendingCount.setInteger(pendingCount.getInteger() - 1);
if (pendingCount.getInteger() == 0 && 
!hasBeenPulled(in)) pull(in);
}
}
});
}
//Handler for SD.out outlet
{
setHandler(sOut, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
{
pendingCount.setInteger(pendingCount.getInteger() - 
1);
pending1.setValue(false);
if (pendingCount.getInteger() == 0) pull(in);
}
}

@Override
public void onDownstreamFinish() {

downstreamRunning.setInteger(downstreamRunning.getInteger() - 1);
if (downstreamRunning.getInteger() == 0) 
completeStage();
else {
if (pending1.getBoolValue()) 
pendingCount.setInteger(pendingCount.getInteger() - 1);
if (pendingCount.getInteger() == 0 && 
!hasBeenPulled(in)) pull(in);
}
}
});
}
//Handler for CDir.out outlet
{
setHandler(cOut, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
{
pendingCount.setInteger(pendingCount.getInteger() - 
1);
pending2.setValue(false);
if (pendingCount.getInteger() == 0) pull(in);
}
}

@Override
public void onDownstreamFinish() {

downstreamRunning.setInteger(downstreamRunning.getInteger() - 1);
if (downstreamRunning.getInteger() == 0) 
completeStage();
else {
if (pending2.getBoolValue()) 
pendingCount.setInteger(pendingCount.getInteger() - 1);
if (pendingCount.getInteger() == 0 && 
!hasBeenPulled(in)) pull(in);
}
}
});
}

};
}

private Outlet findOutlet(SPFinderData elem) {
if(elem.isF() || elem.isL()) {
System.out.println("Splitte: Redirect to FL.");
 

Re: [akka-user] Akka streams - tripped up by buffers

2016-11-23 Thread Julian Howarth
Is there any value in this approach for anyone else? Quite happy to raise 
issues/PR if there is.

Thanks,

Julian


On Thursday, November 17, 2016 at 5:53:37 PM UTC, Julian Howarth wrote:
>
> Viktor,
>
> Here's what I came up with: 
> https://gist.github.com/julianhowarth/7287a6e6eaf665dd79307aaff6164cd8
>
> Performance-wise, with some very casual testing I can't detect much, if 
> any, overhead over the original. Checked with both the vanilla case where 
> each inbound messaga corresponds to one frame, and also where the frames 
> are randomly split across the inbound packets. However, I assume you have a 
> more comprehensive set of tests which could be run.
>
> Julian
>
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


  1   2   3   4   5   6   >