[akka-user] Akka Supervisor Strategy

2016-03-21 Thread Arun Sethia
Hi,

I have setup supervisorStrategy as following in a supervisor ActorB, the 
ActorC creates and ActorC.

class ActorB extends Actor{

  private var actorC: Option[ActorRef] = None


  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2,
 withinTimeRange = FiniteDuration(1, TimeUnit.MINUTES),loggingEnabled = 
true) {
 case _: TimeoutException => Resume
 case _: AskTimeoutException => Resume
  }


  override def preStart(): Unit = {
actorC = Some(context.actorOf(ActorC.props, "actorC"))
context.watch(actorB.get)
  }

}


The ActorA is supervisor for ActorB, my understanding is that ActorB will 
Resume 2 time in 1 minute in case of TimeoutException or AskTimeoutException 
and afterwards it will escalate to ActorA.



class ActorA extends Actor{

  private var actorB: Option[ActorRef] = None


  override val supervisorStrategy = OneForOneStrategy{
 case _: TimeoutException => Stop
 case _: AskTimeoutException => Stop
  }


  override def preStart(): Unit = {
 actorC = Some(context.actorOf(ActorB.props, "actorC"))
 context.watch(actorB.get)
  }


def receive: Receive = {

  case t: Terminated => {
context.stop(self)
context.system.shutdown()

  }


}


This means ActorA should shutdown system after 2 times resume strategy in 1 
minute, but I don't see it is happening, not sure where I a going wrong.


Thanks

Arun


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Intercept STDOUT of other process

2016-03-19 Thread Arun Sethia
Thanks.

My question is more on STDOUT of any other process, specially if it is non 
java.

On Saturday, March 19, 2016 at 6:54:51 AM UTC-5, Konrad Malawski wrote:
>
> Hi there,
>
> Please don't *immediately* cross post your questions here and stack 
> overflow 
> <http://stackoverflow.com/questions/36096694/intercept-stdout-and-stream-via-akka/36101376#36101376>,
>  
> it makes it harder to track answered questions. It's ok to cross post if 
> after a while you did not receive an answer, but please don't do so 
> immediately.
>
>
> In order to "intercept" stdout in Java you can setOut on the System object. 
> It takes a PrintWriter, which we are able to create by wrapping an 
> OutputStream "bridge" that Akka Streams provide, here's how:
>
>  val is: OutputStream = StreamConverters.asOutputStream()
>.to(Sink.foreach(println)) // your logic pipeline here
>.run()
>  System.setOut(new PrintStream(is))
>
>
> -- 
> Konrad
> Akka @ Lightbend
>
> W dniu sobota, 19 marca 2016 01:29:33 UTC+1 użytkownik Arun Sethia napisał:
>>
>> Hi,
>>
>> Is it possible to intercept STDOUT of any other process and stream them 
>> via akka streaming?
>>
>> Thanks
>> Arun
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Intercept STDOUT of other process

2016-03-18 Thread Arun Sethia
Hi,

Is it possible to intercept STDOUT of any other process and stream them via 
akka streaming?

Thanks
Arun

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Monitoring Akka Streaming

2016-03-11 Thread Arun Sethia
Thanks.

I would like to keep monitoring counters in the actorsystem, so those can 
be exposed via service to external world.




On Thursday, March 10, 2016 at 5:48:12 PM UTC-6, rrodseth wrote:
>
> You can also use alsoTo to send stream elements to an actor or special 
> purpose Sink. 
>
> On Thu, Mar 10, 2016 at 10:49 AM, Filippo De Luca  > wrote:
>
>> Hi,
>> I suppose you can use map and call a external service for each message at 
>> defined stage.
>>
>> Even better you can build your own stage.
>>
>> On 10 March 2016 at 17:28, Arun Sethia > 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a requirement where we would like to know how many incoming 
>>> messages are processed by flow. We can use Materializer with AtomicLong 
>>> with Flow to do the same.
>>>
>>> Any other alternative or in-built functionality in akka-stream API?
>>>
>>> Thanks 
>>> Arun 
>>>
>>> -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com .
>>> To post to this group, send email to akka...@googlegroups.com 
>>> .
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> -- 
>>  
>>
>> [image: --]
>> Filippo De Luca
>> [image: http://]about.me/FilippoDeLuca
>> <http://about.me/FilippoDeLuca?promo=email_sig>  
>>  
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Http Client with Source.tick

2016-03-10 Thread Arun Sethia
Super ... you guys are awesome ... more I am learning, it is becoming more 
and more interesting.

I used recover, It worked well. Thanks a lot.


On Thursday, March 10, 2016 at 4:52:30 PM UTC-6, √ wrote:
>
>
> http://doc.akka.io/api/akka/2.4.2/?_ga=1.176202021.1085570125.1455222020#akka.stream.scaladsl.FlowOps
>
> -- 
> Cheers,
> √
> On Mar 10, 2016 11:43 PM, "Arun Sethia" > 
> wrote:
>
>> Thanks.
>>
>> please can you provide the link where I can see how to deal with such 
>> errors.
>>
>> On Thursday, March 10, 2016 at 4:22:35 PM UTC-6, √ wrote:
>>>
>>> For each only deals with elements, not errors.
>>> Please consult the documentation for combinators that let you observe 
>>> and manipulate errors.
>>>
>>> -- 
>>> Cheers,
>>> √
>>> On Mar 10, 2016 11:04 PM, "Arun Sethia"  wrote:
>>>
>>>> httpSourceGraph1 is httpSourceGraph , but still same issue 
>>>>
>>>>
>>>> val response= httpSourceGraph.via(httpConnFlow()).runForeach(println)
>>>>
>>>>
>>>> we can test same with any external internet URL also, the result remain 
>>>> same. 
>>>>
>>>>
>>>> On Thursday, March 10, 2016 at 3:55:22 PM UTC-6, Arun Sethia wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to connect http client to a http service exposed by 
>>>>> server, the source should send request every 1 second for that I have 
>>>>> crated following partial graphs:
>>>>>
>>>>>
>>>>> def httpSourceGraph() = {
>>>>>   Source.fromGraph(GraphDSL.create() { implicit builder =>
>>>>> val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, 
>>>>> TimeUnit.SECONDS), FiniteDuration(1,
>>>>>   TimeUnit.SECONDS),
>>>>>   HttpRequest(uri ="/test", method = HttpMethods.GET))).out
>>>>> // expose outlet
>>>>> SourceShape(sourceOutLet)
>>>>>   })
>>>>> }
>>>>>
>>>>>
>>>>> def httpConnFlow() = {
>>>>>   Flow.fromGraph(GraphDSL.create() { implicit builder =>
>>>>>
>>>>> val httpSourceFlow = builder.add(Http(system).outgoingConnection(host 
>>>>> = "localhost", port = 8080))
>>>>>
>>>>> FlowShape(httpSourceFlow.in, httpSourceFlow.out)
>>>>>   })
>>>>> }
>>>>>
>>>>>
>>>>> the graph is composed as
>>>>>
>>>>>
>>>>> val response= httpSourceGraph1.via(httpConnFlow()).runForeach(println)
>>>>>
>>>>>
>>>>> if the http server (localhost:8080/test) is up and running, everything 
>>>>> works fine, every 1 second I can see the response coming back from the 
>>>>> server. I am not able to any response in case of  either server is down 
>>>>> or it goes down later.
>>>>>
>>>>>
>>>>> *I think it should give me following error:*
>>>>>
>>>>>
>>>>> akka.stream.StreamTcpException: Tcp command 
>>>>> [Connect(localhost/127.0.0.1:8080,None,List(),Some(10 seconds),true)] 
>>>>> failed
>>>>>
>>>>>
>>>>> Thanks for the help.
>>>>>
>>>>> -Arun
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Http Client with Source.tick

2016-03-10 Thread Arun Sethia
Thanks.

please can you provide the link where I can see how to deal with such 
errors.

On Thursday, March 10, 2016 at 4:22:35 PM UTC-6, √ wrote:
>
> For each only deals with elements, not errors.
> Please consult the documentation for combinators that let you observe and 
> manipulate errors.
>
> -- 
> Cheers,
> √
> On Mar 10, 2016 11:04 PM, "Arun Sethia" > 
> wrote:
>
>> httpSourceGraph1 is httpSourceGraph , but still same issue 
>>
>>
>> val response= httpSourceGraph.via(httpConnFlow()).runForeach(println)
>>
>>
>> we can test same with any external internet URL also, the result remain 
>> same. 
>>
>>
>> On Thursday, March 10, 2016 at 3:55:22 PM UTC-6, Arun Sethia wrote:
>>>
>>> Hi,
>>>
>>> I am trying to connect http client to a http service exposed by server, 
>>> the source should send request every 1 second for that I have crated 
>>> following partial graphs:
>>>
>>>
>>> def httpSourceGraph() = {
>>>   Source.fromGraph(GraphDSL.create() { implicit builder =>
>>> val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, 
>>> TimeUnit.SECONDS), FiniteDuration(1,
>>>   TimeUnit.SECONDS),
>>>   HttpRequest(uri ="/test", method = HttpMethods.GET))).out
>>> // expose outlet
>>> SourceShape(sourceOutLet)
>>>   })
>>> }
>>>
>>>
>>> def httpConnFlow() = {
>>>   Flow.fromGraph(GraphDSL.create() { implicit builder =>
>>>
>>> val httpSourceFlow = builder.add(Http(system).outgoingConnection(host = 
>>> "localhost", port = 8080))
>>>
>>> FlowShape(httpSourceFlow.in, httpSourceFlow.out)
>>>   })
>>> }
>>>
>>>
>>> the graph is composed as
>>>
>>>
>>> val response= httpSourceGraph1.via(httpConnFlow()).runForeach(println)
>>>
>>>
>>> if the http server (localhost:8080/test) is up and running, everything 
>>> works fine, every 1 second I can see the response coming back from the 
>>> server. I am not able to any response in case of  either server is down or 
>>> it goes down later.
>>>
>>>
>>> *I think it should give me following error:*
>>>
>>>
>>> akka.stream.StreamTcpException: Tcp command 
>>> [Connect(localhost/127.0.0.1:8080,None,List(),Some(10 seconds),true)] failed
>>>
>>>
>>> Thanks for the help.
>>>
>>> -Arun
>>>
>>>
>>>
>>>
>>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Http Client with Source.tick

2016-03-10 Thread Arun Sethia


httpSourceGraph1 is httpSourceGraph , but still same issue 


val response= httpSourceGraph.via(httpConnFlow()).runForeach(println)


we can test same with any external internet URL also, the result remain same. 


On Thursday, March 10, 2016 at 3:55:22 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> I am trying to connect http client to a http service exposed by server, 
> the source should send request every 1 second for that I have crated 
> following partial graphs:
>
>
> def httpSourceGraph() = {
>   Source.fromGraph(GraphDSL.create() { implicit builder =>
> val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, 
> TimeUnit.SECONDS), FiniteDuration(1,
>   TimeUnit.SECONDS),
>   HttpRequest(uri ="/test", method = HttpMethods.GET))).out
> // expose outlet
> SourceShape(sourceOutLet)
>   })
> }
>
>
> def httpConnFlow() = {
>   Flow.fromGraph(GraphDSL.create() { implicit builder =>
>
> val httpSourceFlow = builder.add(Http(system).outgoingConnection(host = 
> "localhost", port = 8080))
>
> FlowShape(httpSourceFlow.in, httpSourceFlow.out)
>   })
> }
>
>
> the graph is composed as
>
>
> val response= httpSourceGraph1.via(httpConnFlow()).runForeach(println)
>
>
> if the http server (localhost:8080/test) is up and running, everything works 
> fine, every 1 second I can see the response coming back from the server. I am 
> not able to any response in case of  either server is down or it goes down 
> later.
>
>
> *I think it should give me following error:*
>
>
> akka.stream.StreamTcpException: Tcp command 
> [Connect(localhost/127.0.0.1:8080,None,List(),Some(10 seconds),true)] failed
>
>
> Thanks for the help.
>
> -Arun
>
>
>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Http Client with Source.tick

2016-03-10 Thread Arun Sethia
Hi,

I am trying to connect http client to a http service exposed by server, the 
source should send request every 1 second for that I have crated following 
partial graphs:


def httpSourceGraph() = {
  Source.fromGraph(GraphDSL.create() { implicit builder =>
val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, 
TimeUnit.SECONDS), FiniteDuration(1,
  TimeUnit.SECONDS),
  HttpRequest(uri ="/test", method = HttpMethods.GET))).out
// expose outlet
SourceShape(sourceOutLet)
  })
}


def httpConnFlow() = {
  Flow.fromGraph(GraphDSL.create() { implicit builder =>

val httpSourceFlow = builder.add(Http(system).outgoingConnection(host = 
"localhost", port = 8080))

FlowShape(httpSourceFlow.in, httpSourceFlow.out)
  })
}


the graph is composed as


val response= httpSourceGraph1.via(httpConnFlow()).runForeach(println)


if the http server (localhost:8080/test) is up and running, everything works 
fine, every 1 second I can see the response coming back from the server. I am 
not able to any response in case of  either server is down or it goes down 
later.


*I think it should give me following error:*


akka.stream.StreamTcpException: Tcp command 
[Connect(localhost/127.0.0.1:8080,None,List(),Some(10 seconds),true)] failed


Thanks for the help.

-Arun




-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Monitoring Akka Streaming

2016-03-10 Thread Arun Sethia
Hi,

I have a requirement where we would like to know how many incoming messages 
are processed by flow. We can use Materializer with AtomicLong with Flow to 
do the same.

Any other alternative or in-built functionality in akka-stream API?

Thanks 
Arun 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Source Repeat given interval

2016-03-07 Thread Arun Sethia
thanks a lot. Cool.. I should have looked 
into 
http://doc.akka.io/api/akka-stream-and-http-experimental/2.0/#akka.stream.scaladsl.Source
 

val input:List[Int]=List(1,2,3)

val source1 = 
Source.tick(FiniteDuration(1,TimeUnit.SECONDS),FiniteDuration(1,TimeUnit.SECONDS),input)

source1.mapConcat[Int](_.toList).via(flow).runForeach(println)


What you would suggest if List[HttpRequest] and would like to use Flow as 
Http(system).outgoingConnection(host = "abs", port = 8080)  can I use same as 
above or any better approach.


On Monday, March 7, 2016 at 3:29:43 PM UTC-6, √ wrote:
>
> mapConcat(identity)
>
> -- 
> Cheers,
> √
> On Mar 7, 2016 10:26 PM, "Arun Sethia" > 
> wrote:
>
>> Hi,
>>
>> I have requirement where source should repeat infinite times after given 
>> interval, for example:
>>
>> val input:List[Int]=List(1,2,3)
>>
>>
>> val flow=Flow[Int].map(x=> x * 2)
>>
>>
>> Source(input.toList).via(*flow)*.runForeach(println)
>>
>>
>> The source "input" should repeat every 1 second. 
>>
>>
>> I tried to use val source = 
>> Source.tick(FiniteDuration(1,TimeUnit.SECONDS),FiniteDuration(1,TimeUnit.SECONDS),input),
>>   but this will provide out stream as List.
>>
>>
>> Thanks  
>>
>> Arun
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Source Repeat given interval

2016-03-07 Thread Arun Sethia
Hi,

I have requirement where source should repeat infinite times after given 
interval, for example:

val input:List[Int]=List(1,2,3)


val flow=Flow[Int].map(x=> x * 2)


Source(input.toList).via(*flow)*.runForeach(println)


The source "input" should repeat every 1 second. 


I tried to use val source = 
Source.tick(FiniteDuration(1,TimeUnit.SECONDS),FiniteDuration(1,TimeUnit.SECONDS),input),
  but this will provide out stream as List.


Thanks  

Arun

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-07 Thread Arun Sethia
Awesome thanks a lot. 

On Sunday, March 6, 2016 at 7:20:43 AM UTC-6, Rafał Krzewski wrote:
>
> Arun,
>
> a little correction:
>
> val runnableGraph = 
> source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both)
>
> And subsequently:
>
> val (counter, futureSum) = runnableGraph.run()
>
> Graph outlets are always streams. You need to connect them to a Sink 
> (through intervening Flows or more complex Graphs, as necessary) in order 
> to create a RunnableGraph. Materialized values are the other things used 
> to connect the RunnableGraph to the outside world that are *not* streams.
>
> For example Sink.fold creates a stream element that is (obviously) a Sink. 
> It does not have any stream outlets. However it provides a materialized 
> value Future[U] that is completed when the Sink's inlet stream is 
> exhausted. This is how a running stream can communicate it's successful 
> completion or failure to the outside world.
>
> Another example is Source.actorPublisher: you provide it with Props for 
> an Actor that implements ActorPublisher contract. When materializing the 
> stream, the Source will instantiate the Actor and return it's ActorRef as 
> a materialized value. The Actor is internal to the stream but you can use 
> the ActorRef as an interface from the outside world into the stream: send 
> messages (using your own protocol) to be passed to the Source's outlet, 
> according to demand from downstream. The tricky part is that such gateway 
> Actor must manage buffering and/or backpressure on it's own!
>
> Besides that, you can use materialized values to monitor stream execution 
> from the outside, like in the Counter example above or 
> https://github.com/akka/akka/pull/19836 or to interrupt a stream that 
> would otherwise run for a long (or unlimited) time: 
> https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala
>
> Cheers,
> Rafał
>
> W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 użytkownik Arun Sethia 
> napisał:
>>
>> Thanks Rafal.
>>
>> Based on this I tried to make sample code, where I would like to count 
>> number of elements being processed and their sum:
>>
>> val source = Source (1 to 5).filter(x=> x%2==0)
>>
>> val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)
>>
>> val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)
>>
>> val result=runnableGraph.run()
>>
>>
>> def counter[T]: Flow[T, T, Counter] = {
>>   val internalCounter = new AtomicLong(0)
>>   Flow[T].map{ elem ⇒
>> internalCounter.incrementAndGet()
>> elem
>> }.mapMaterializedValue(_ ⇒ new Counter{
>> override def get = internalCounter.get
>>   })
>> }
>>
>>
>>
>> 1. using Keep.both, result should able to return me count and sum, but it is 
>> not?
>>
>> 2. How materialize values are different than "out"? I am not able to 
>> visualize the difference between materialize values and out?
>>
>> Thanks 
>> Arun
>>
>>
>>
>> On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>>>
>>> Hi,
>>>
>>> can some explain what does it mean of materialized value ? I have see 
>>> documentation at 
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>>>  
>>>
>>> I am not sure how Flow can define materialize type, for example the 
>>> following code has Input - Tweet, output - Int but Mat is Unit. I would 
>>> like to see how someone can define Mat as Int or any example where Flow or 
>>> source is defining Mat other than Unit.
>>>
>>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>>>
>>>
>>>
>>> It is quite confusing for me to understand difference between "out"  and 
>>> "Mat".
>>>
>>>
>>> Thanks 
>>>
>>> As
>>>
>>>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
Thanks.

I tried to put example using same:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.left)

val result=runnableGraph.run()

def counter[T]: Flow[T, T, Counter] = {
  val internalCounter = new AtomicLong(0)
  Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
  })
}

The result should give me a tuple having count (flow) and sum (Sink) value, 
since I am using Keep.both,* but it is giving only Future[Int].*

what is difference between materialize value and out?

Thanks 
Arun



On Saturday, March 5, 2016 at 6:46:12 PM UTC-6, Rafał Krzewski wrote:
>
> Hi,
> there are a few ways of doing that. Probably the simplest one is using 
> Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts 
> the elements that pass through it and makes the current count available 
> through a "side channel":
>
>   trait Counter {
> def get: Long
>   }
>
>   def counter[T]: Flow[T, T, Counter] = {
> val internalCounter = new AtomicLong(0)
> Flow[T].map{ elem ⇒
>   internalCounter.incrementAndGet()
>   elem
>  }.mapMaterializedValue(_ ⇒ new Counter{
>override def get = internalCounter.get
>  })
>   } 
>
> Another way is using a GraphStageWithMaterializedValue while building a 
> custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like 
> an ordinary GraphStage, you return a pair of GraphStageLogic and the 
> materialized value.
>
> Cheers,
> Rafał
>
> W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 użytkownik Arun Sethia 
> napisał:
>>
>> Hi,
>>
>> can some explain what does it mean of materialized value ? I have see 
>> documentation at 
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>>  
>>
>> I am not sure how Flow can define materialize type, for example the 
>> following code has Input - Tweet, output - Int but Mat is Unit. I would 
>> like to see how someone can define Mat as Int or any example where Flow or 
>> source is defining Mat other than Unit.
>>
>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>>
>>
>>
>> It is quite confusing for me to understand difference between "out"  and 
>> "Mat".
>>
>>
>> Thanks 
>>
>> As
>>
>>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
the given same code:

val source = Source (1 to 5).filter(x=> x%2==0)

val flow:Flow[Int,Int,Unit]=Flow[Int].map(x=> x * 2)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(flow).toMat(sink)(Keep.both)

I am not sure what is use of using Keep.both vs Keep.left, I thought If I use 
keep.both, will able to get values for flow and sink as tuple.




On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
more 

Source[+Out, +Mat],Flow[-In, +Out, +Mat] and Sink[-In, +Mat] , in all cases 
what is +Mat type and how I can define one such , if possible any example 
will be great. 


On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
Thanks Rafal.

Based on this I tried to make sample code, where I would like to count 
number of elements being processed and their sum:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)

val result=runnableGraph.run()


def counter[T]: Flow[T, T, Counter] = {
  val internalCounter = new AtomicLong(0)
  Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
  })
}



1. using Keep.both, result should able to return me count and sum, but it is 
not?

2. How materialize values are different than "out"? I am not able to visualize 
the difference between materialize values and out?

Thanks 
Arun



On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] how to define materialized value

2016-03-05 Thread Arun Sethia
Hi,

can some explain what does it mean of materialized value ? I have see 
documentation at 
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
 


I am not sure how Flow can define materialize type, for example the 
following code has Input - Tweet, output - Int but Mat is Unit. I would 
like to see how someone can define Mat as Int or any example where Flow or 
source is defining Mat other than Unit.

- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)



It is quite confusing for me to understand difference between "out"  and 
"Mat".


Thanks 

As

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.