[akka-user] Re: how to define materialized value

2016-03-07 Thread Arun Sethia
Awesome thanks a lot. 

On Sunday, March 6, 2016 at 7:20:43 AM UTC-6, Rafał Krzewski wrote:
>
> Arun,
>
> a little correction:
>
> val runnableGraph = 
> source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both)
>
> And subsequently:
>
> val (counter, futureSum) = runnableGraph.run()
>
> Graph outlets are always streams. You need to connect them to a Sink 
> (through intervening Flows or more complex Graphs, as necessary) in order 
> to create a RunnableGraph. Materialized values are the other things used 
> to connect the RunnableGraph to the outside world that are *not* streams.
>
> For example Sink.fold creates a stream element that is (obviously) a Sink. 
> It does not have any stream outlets. However it provides a materialized 
> value Future[U] that is completed when the Sink's inlet stream is 
> exhausted. This is how a running stream can communicate it's successful 
> completion or failure to the outside world.
>
> Another example is Source.actorPublisher: you provide it with Props for 
> an Actor that implements ActorPublisher contract. When materializing the 
> stream, the Source will instantiate the Actor and return it's ActorRef as 
> a materialized value. The Actor is internal to the stream but you can use 
> the ActorRef as an interface from the outside world into the stream: send 
> messages (using your own protocol) to be passed to the Source's outlet, 
> according to demand from downstream. The tricky part is that such gateway 
> Actor must manage buffering and/or backpressure on it's own!
>
> Besides that, you can use materialized values to monitor stream execution 
> from the outside, like in the Counter example above or 
> https://github.com/akka/akka/pull/19836 or to interrupt a stream that 
> would otherwise run for a long (or unlimited) time: 
> https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala
>
> Cheers,
> Rafał
>
> W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 użytkownik Arun Sethia 
> napisał:
>>
>> Thanks Rafal.
>>
>> Based on this I tried to make sample code, where I would like to count 
>> number of elements being processed and their sum:
>>
>> val source = Source (1 to 5).filter(x=> x%2==0)
>>
>> val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)
>>
>> val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)
>>
>> val result=runnableGraph.run()
>>
>>
>> def counter[T]: Flow[T, T, Counter] = {
>>   val internalCounter = new AtomicLong(0)
>>   Flow[T].map{ elem ⇒
>> internalCounter.incrementAndGet()
>> elem
>> }.mapMaterializedValue(_ ⇒ new Counter{
>> override def get = internalCounter.get
>>   })
>> }
>>
>>
>>
>> 1. using Keep.both, result should able to return me count and sum, but it is 
>> not?
>>
>> 2. How materialize values are different than "out"? I am not able to 
>> visualize the difference between materialize values and out?
>>
>> Thanks 
>> Arun
>>
>>
>>
>> On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>>>
>>> Hi,
>>>
>>> can some explain what does it mean of materialized value ? I have see 
>>> documentation at 
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>>>  
>>>
>>> I am not sure how Flow can define materialize type, for example the 
>>> following code has Input - Tweet, output - Int but Mat is Unit. I would 
>>> like to see how someone can define Mat as Int or any example where Flow or 
>>> source is defining Mat other than Unit.
>>>
>>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>>>
>>>
>>>
>>> It is quite confusing for me to understand difference between "out"  and 
>>> "Mat".
>>>
>>>
>>> Thanks 
>>>
>>> As
>>>
>>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-06 Thread Rafał Krzewski
Arun,

a little correction:

val runnableGraph = 
source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both)

And subsequently:

val (counter, futureSum) = runnableGraph.run()

Graph outlets are always streams. You need to connect them to a Sink 
(through intervening Flows or more complex Graphs, as necessary) in order 
to create a RunnableGraph. Materialized values are the other things used to 
connect the RunnableGraph to the outside world that are *not* streams.

For example Sink.fold creates a stream element that is (obviously) a Sink. 
It does not have any stream outlets. However it provides a materialized 
value Future[U] that is completed when the Sink's inlet stream is 
exhausted. This is how a running stream can communicate it's successful 
completion or failure to the outside world.

Another example is Source.actorPublisher: you provide it with Props for an 
Actor that implements ActorPublisher contract. When materializing the 
stream, the Source will instantiate the Actor and return it's ActorRef as a 
materialized value. The Actor is internal to the stream but you can use the 
ActorRef as an interface from the outside world into the stream: send 
messages (using your own protocol) to be passed to the Source's outlet, 
according to demand from downstream. The tricky part is that such gateway 
Actor must manage buffering and/or backpressure on it's own!

Besides that, you can use materialized values to monitor stream execution 
from the outside, like in the Counter example above 
or https://github.com/akka/akka/pull/19836 or to interrupt a stream that 
would otherwise run for a long (or unlimited) 
time: 
https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala

Cheers,
Rafał

W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 użytkownik Arun Sethia 
napisał:
>
> Thanks Rafal.
>
> Based on this I tried to make sample code, where I would like to count 
> number of elements being processed and their sum:
>
> val source = Source (1 to 5).filter(x=> x%2==0)
>
> val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)
>
> val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)
>
> val result=runnableGraph.run()
>
>
> def counter[T]: Flow[T, T, Counter] = {
>   val internalCounter = new AtomicLong(0)
>   Flow[T].map{ elem ⇒
> internalCounter.incrementAndGet()
> elem
> }.mapMaterializedValue(_ ⇒ new Counter{
> override def get = internalCounter.get
>   })
> }
>
>
>
> 1. using Keep.both, result should able to return me count and sum, but it is 
> not?
>
> 2. How materialize values are different than "out"? I am not able to 
> visualize the difference between materialize values and out?
>
> Thanks 
> Arun
>
>
>
> On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>>
>> Hi,
>>
>> can some explain what does it mean of materialized value ? I have see 
>> documentation at 
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>>  
>>
>> I am not sure how Flow can define materialize type, for example the 
>> following code has Input - Tweet, output - Int but Mat is Unit. I would 
>> like to see how someone can define Mat as Int or any example where Flow or 
>> source is defining Mat other than Unit.
>>
>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>>
>>
>>
>> It is quite confusing for me to understand difference between "out"  and 
>> "Mat".
>>
>>
>> Thanks 
>>
>> As
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
Thanks.

I tried to put example using same:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.left)

val result=runnableGraph.run()

def counter[T]: Flow[T, T, Counter] = {
  val internalCounter = new AtomicLong(0)
  Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
  })
}

The result should give me a tuple having count (flow) and sum (Sink) value, 
since I am using Keep.both,* but it is giving only Future[Int].*

what is difference between materialize value and out?

Thanks 
Arun



On Saturday, March 5, 2016 at 6:46:12 PM UTC-6, Rafał Krzewski wrote:
>
> Hi,
> there are a few ways of doing that. Probably the simplest one is using 
> Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts 
> the elements that pass through it and makes the current count available 
> through a "side channel":
>
>   trait Counter {
> def get: Long
>   }
>
>   def counter[T]: Flow[T, T, Counter] = {
> val internalCounter = new AtomicLong(0)
> Flow[T].map{ elem ⇒
>   internalCounter.incrementAndGet()
>   elem
>  }.mapMaterializedValue(_ ⇒ new Counter{
>override def get = internalCounter.get
>  })
>   } 
>
> Another way is using a GraphStageWithMaterializedValue while building a 
> custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like 
> an ordinary GraphStage, you return a pair of GraphStageLogic and the 
> materialized value.
>
> Cheers,
> Rafał
>
> W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 użytkownik Arun Sethia 
> napisał:
>>
>> Hi,
>>
>> can some explain what does it mean of materialized value ? I have see 
>> documentation at 
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>>  
>>
>> I am not sure how Flow can define materialize type, for example the 
>> following code has Input - Tweet, output - Int but Mat is Unit. I would 
>> like to see how someone can define Mat as Int or any example where Flow or 
>> source is defining Mat other than Unit.
>>
>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>>
>>
>>
>> It is quite confusing for me to understand difference between "out"  and 
>> "Mat".
>>
>>
>> Thanks 
>>
>> As
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
the given same code:

val source = Source (1 to 5).filter(x=> x%2==0)

val flow:Flow[Int,Int,Unit]=Flow[Int].map(x=> x * 2)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(flow).toMat(sink)(Keep.both)

I am not sure what is use of using Keep.both vs Keep.left, I thought If I use 
keep.both, will able to get values for flow and sink as tuple.




On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
more 

Source[+Out, +Mat],Flow[-In, +Out, +Mat] and Sink[-In, +Mat] , in all cases 
what is +Mat type and how I can define one such , if possible any example 
will be great. 


On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
Thanks Rafal.

Based on this I tried to make sample code, where I would like to count 
number of elements being processed and their sum:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)

val result=runnableGraph.run()


def counter[T]: Flow[T, T, Counter] = {
  val internalCounter = new AtomicLong(0)
  Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
  })
}



1. using Keep.both, result should able to return me count and sum, but it is 
not?

2. How materialize values are different than "out"? I am not able to visualize 
the difference between materialize values and out?

Thanks 
Arun



On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Rafał Krzewski
Hi,
there are a few ways of doing that. Probably the simplest one is using 
Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts 
the elements that pass through it and makes the current count available 
through a "side channel":

  trait Counter {
def get: Long
  }

  def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
  internalCounter.incrementAndGet()
  elem
 }.mapMaterializedValue(_ ⇒ new Counter{
   override def get = internalCounter.get
 })
  } 

Another way is using a GraphStageWithMaterializedValue while building a 
custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like 
an ordinary GraphStage, you return a pair of GraphStageLogic and the 
materialized value.

Cheers,
Rafał

W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 użytkownik Arun Sethia 
napisał:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.