[akka-user] Re: how to define materialized value
Awesome thanks a lot. On Sunday, March 6, 2016 at 7:20:43 AM UTC-6, Rafał Krzewski wrote: > > Arun, > > a little correction: > > val runnableGraph = > source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both) > > And subsequently: > > val (counter, futureSum) = runnableGraph.run() > > Graph outlets are always streams. You need to connect them to a Sink > (through intervening Flows or more complex Graphs, as necessary) in order > to create a RunnableGraph. Materialized values are the other things used > to connect the RunnableGraph to the outside world that are *not* streams. > > For example Sink.fold creates a stream element that is (obviously) a Sink. > It does not have any stream outlets. However it provides a materialized > value Future[U] that is completed when the Sink's inlet stream is > exhausted. This is how a running stream can communicate it's successful > completion or failure to the outside world. > > Another example is Source.actorPublisher: you provide it with Props for > an Actor that implements ActorPublisher contract. When materializing the > stream, the Source will instantiate the Actor and return it's ActorRef as > a materialized value. The Actor is internal to the stream but you can use > the ActorRef as an interface from the outside world into the stream: send > messages (using your own protocol) to be passed to the Source's outlet, > according to demand from downstream. The tricky part is that such gateway > Actor must manage buffering and/or backpressure on it's own! > > Besides that, you can use materialized values to monitor stream execution > from the outside, like in the Counter example above or > https://github.com/akka/akka/pull/19836 or to interrupt a stream that > would otherwise run for a long (or unlimited) time: > https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala > > Cheers, > Rafał > > W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 użytkownik Arun Sethia > napisał: >> >> Thanks Rafal. >> >> Based on this I tried to make sample code, where I would like to count >> number of elements being processed and their sum: >> >> val source = Source (1 to 5).filter(x=> x%2==0) >> >> val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _) >> >> val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both) >> >> val result=runnableGraph.run() >> >> >> def counter[T]: Flow[T, T, Counter] = { >> val internalCounter = new AtomicLong(0) >> Flow[T].map{ elem ⇒ >> internalCounter.incrementAndGet() >> elem >> }.mapMaterializedValue(_ ⇒ new Counter{ >> override def get = internalCounter.get >> }) >> } >> >> >> >> 1. using Keep.both, result should able to return me count and sum, but it is >> not? >> >> 2. How materialize values are different than "out"? I am not able to >> visualize the difference between materialize values and out? >> >> Thanks >> Arun >> >> >> >> On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote: >>> >>> Hi, >>> >>> can some explain what does it mean of materialized value ? I have see >>> documentation at >>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams >>> >>> >>> I am not sure how Flow can define materialize type, for example the >>> following code has Input - Tweet, output - Int but Mat is Unit. I would >>> like to see how someone can define Mat as Int or any example where Flow or >>> source is defining Mat other than Unit. >>> >>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) >>> >>> >>> >>> It is quite confusing for me to understand difference between "out" and >>> "Mat". >>> >>> >>> Thanks >>> >>> As >>> >>> -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: how to define materialized value
Arun, a little correction: val runnableGraph = source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both) And subsequently: val (counter, futureSum) = runnableGraph.run() Graph outlets are always streams. You need to connect them to a Sink (through intervening Flows or more complex Graphs, as necessary) in order to create a RunnableGraph. Materialized values are the other things used to connect the RunnableGraph to the outside world that are *not* streams. For example Sink.fold creates a stream element that is (obviously) a Sink. It does not have any stream outlets. However it provides a materialized value Future[U] that is completed when the Sink's inlet stream is exhausted. This is how a running stream can communicate it's successful completion or failure to the outside world. Another example is Source.actorPublisher: you provide it with Props for an Actor that implements ActorPublisher contract. When materializing the stream, the Source will instantiate the Actor and return it's ActorRef as a materialized value. The Actor is internal to the stream but you can use the ActorRef as an interface from the outside world into the stream: send messages (using your own protocol) to be passed to the Source's outlet, according to demand from downstream. The tricky part is that such gateway Actor must manage buffering and/or backpressure on it's own! Besides that, you can use materialized values to monitor stream execution from the outside, like in the Counter example above or https://github.com/akka/akka/pull/19836 or to interrupt a stream that would otherwise run for a long (or unlimited) time: https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala Cheers, Rafał W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 użytkownik Arun Sethia napisał: > > Thanks Rafal. > > Based on this I tried to make sample code, where I would like to count > number of elements being processed and their sum: > > val source = Source (1 to 5).filter(x=> x%2==0) > > val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _) > > val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both) > > val result=runnableGraph.run() > > > def counter[T]: Flow[T, T, Counter] = { > val internalCounter = new AtomicLong(0) > Flow[T].map{ elem ⇒ > internalCounter.incrementAndGet() > elem > }.mapMaterializedValue(_ ⇒ new Counter{ > override def get = internalCounter.get > }) > } > > > > 1. using Keep.both, result should able to return me count and sum, but it is > not? > > 2. How materialize values are different than "out"? I am not able to > visualize the difference between materialize values and out? > > Thanks > Arun > > > > On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote: >> >> Hi, >> >> can some explain what does it mean of materialized value ? I have see >> documentation at >> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams >> >> >> I am not sure how Flow can define materialize type, for example the >> following code has Input - Tweet, output - Int but Mat is Unit. I would >> like to see how someone can define Mat as Int or any example where Flow or >> source is defining Mat other than Unit. >> >> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) >> >> >> >> It is quite confusing for me to understand difference between "out" and >> "Mat". >> >> >> Thanks >> >> As >> >> -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: how to define materialized value
Thanks. I tried to put example using same: val source = Source (1 to 5).filter(x=> x%2==0) val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _) val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.left) val result=runnableGraph.run() def counter[T]: Flow[T, T, Counter] = { val internalCounter = new AtomicLong(0) Flow[T].map{ elem ⇒ internalCounter.incrementAndGet() elem }.mapMaterializedValue(_ ⇒ new Counter{ override def get = internalCounter.get }) } The result should give me a tuple having count (flow) and sum (Sink) value, since I am using Keep.both,* but it is giving only Future[Int].* what is difference between materialize value and out? Thanks Arun On Saturday, March 5, 2016 at 6:46:12 PM UTC-6, Rafał Krzewski wrote: > > Hi, > there are a few ways of doing that. Probably the simplest one is using > Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts > the elements that pass through it and makes the current count available > through a "side channel": > > trait Counter { > def get: Long > } > > def counter[T]: Flow[T, T, Counter] = { > val internalCounter = new AtomicLong(0) > Flow[T].map{ elem ⇒ > internalCounter.incrementAndGet() > elem > }.mapMaterializedValue(_ ⇒ new Counter{ >override def get = internalCounter.get > }) > } > > Another way is using a GraphStageWithMaterializedValue while building a > custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like > an ordinary GraphStage, you return a pair of GraphStageLogic and the > materialized value. > > Cheers, > Rafał > > W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 użytkownik Arun Sethia > napisał: >> >> Hi, >> >> can some explain what does it mean of materialized value ? I have see >> documentation at >> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams >> >> >> I am not sure how Flow can define materialize type, for example the >> following code has Input - Tweet, output - Int but Mat is Unit. I would >> like to see how someone can define Mat as Int or any example where Flow or >> source is defining Mat other than Unit. >> >> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) >> >> >> >> It is quite confusing for me to understand difference between "out" and >> "Mat". >> >> >> Thanks >> >> As >> >> -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: how to define materialized value
the given same code: val source = Source (1 to 5).filter(x=> x%2==0) val flow:Flow[Int,Int,Unit]=Flow[Int].map(x=> x * 2) val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _) val runnableGraph = source.via(flow).toMat(sink)(Keep.both) I am not sure what is use of using Keep.both vs Keep.left, I thought If I use keep.both, will able to get values for flow and sink as tuple. On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote: > > Hi, > > can some explain what does it mean of materialized value ? I have see > documentation at > http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams > > > I am not sure how Flow can define materialize type, for example the > following code has Input - Tweet, output - Int but Mat is Unit. I would > like to see how someone can define Mat as Int or any example where Flow or > source is defining Mat other than Unit. > > - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) > > > > It is quite confusing for me to understand difference between "out" and > "Mat". > > > Thanks > > As > > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: how to define materialized value
more Source[+Out, +Mat],Flow[-In, +Out, +Mat] and Sink[-In, +Mat] , in all cases what is +Mat type and how I can define one such , if possible any example will be great. On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote: > > Hi, > > can some explain what does it mean of materialized value ? I have see > documentation at > http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams > > > I am not sure how Flow can define materialize type, for example the > following code has Input - Tweet, output - Int but Mat is Unit. I would > like to see how someone can define Mat as Int or any example where Flow or > source is defining Mat other than Unit. > > - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) > > > > It is quite confusing for me to understand difference between "out" and > "Mat". > > > Thanks > > As > > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: how to define materialized value
Thanks Rafal. Based on this I tried to make sample code, where I would like to count number of elements being processed and their sum: val source = Source (1 to 5).filter(x=> x%2==0) val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _) val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both) val result=runnableGraph.run() def counter[T]: Flow[T, T, Counter] = { val internalCounter = new AtomicLong(0) Flow[T].map{ elem ⇒ internalCounter.incrementAndGet() elem }.mapMaterializedValue(_ ⇒ new Counter{ override def get = internalCounter.get }) } 1. using Keep.both, result should able to return me count and sum, but it is not? 2. How materialize values are different than "out"? I am not able to visualize the difference between materialize values and out? Thanks Arun On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote: > > Hi, > > can some explain what does it mean of materialized value ? I have see > documentation at > http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams > > > I am not sure how Flow can define materialize type, for example the > following code has Input - Tweet, output - Int but Mat is Unit. I would > like to see how someone can define Mat as Int or any example where Flow or > source is defining Mat other than Unit. > > - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) > > > > It is quite confusing for me to understand difference between "out" and > "Mat". > > > Thanks > > As > > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: how to define materialized value
Hi, there are a few ways of doing that. Probably the simplest one is using Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts the elements that pass through it and makes the current count available through a "side channel": trait Counter { def get: Long } def counter[T]: Flow[T, T, Counter] = { val internalCounter = new AtomicLong(0) Flow[T].map{ elem ⇒ internalCounter.incrementAndGet() elem }.mapMaterializedValue(_ ⇒ new Counter{ override def get = internalCounter.get }) } Another way is using a GraphStageWithMaterializedValue while building a custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like an ordinary GraphStage, you return a pair of GraphStageLogic and the materialized value. Cheers, Rafał W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 użytkownik Arun Sethia napisał: > > Hi, > > can some explain what does it mean of materialized value ? I have see > documentation at > http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams > > > I am not sure how Flow can define materialize type, for example the > following code has Input - Tweet, output - Int but Mat is Unit. I would > like to see how someone can define Mat as Int or any example where Flow or > source is defining Mat other than Unit. > > - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) > > > > It is quite confusing for me to understand difference between "out" and > "Mat". > > > Thanks > > As > > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.