[akka-user] [akka-stream] Some problems with ThunkSource

2014-10-06 Thread Boris Lopukhov
Hello,

I wrote this code:

import akka.actor.ActorSystem
import akka.stream.scaladsl2.BlackholeSink
import akka.stream.scaladsl2.FlowFrom
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.scaladsl2.ThunkSource


object Main extends App {

  implicit val system = ActorSystem()
  implicit val flowMaterializer = FlowMaterializer()

  FlowFrom[Int]
.withSource(ThunkSource(() => Some(1)))
.map(x => x)
.withSink(BlackholeSink)
.run
}

And then got this error: java.lang.ClassCastException: scala.Some cannot be 
cast to java.lang.Integer

What i'm doing wrong? //Akka-stream version - 0.7

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] Some problems with ThunkSource

2014-10-07 Thread Boris Lopukhov
Thanks!

понедельник, 6 октября 2014 г., 20:02:27 UTC+4 пользователь Patrik Nordwall 
написал:
>
> That is a know issue <https://github.com/akka/akka/issues/15892> in 
> scaladsl2 in version 0.7 and 0.8. It is fixed in upcoming 0.9.
>
> You can probably work around it by using an Iterator instead.
>
> /Patrik
>
> On Mon, Oct 6, 2014 at 5:51 PM, Boris Lopukhov <89b...@gmail.com 
> > wrote:
>
>> Hello,
>>
>> I wrote this code:
>>
>> import akka.actor.ActorSystem
>> import akka.stream.scaladsl2.BlackholeSink
>> import akka.stream.scaladsl2.FlowFrom
>> import akka.stream.scaladsl2.FlowMaterializer
>> import akka.stream.scaladsl2.ThunkSource
>>
>>
>> object Main extends App {
>>
>>   implicit val system = ActorSystem()
>>   implicit val flowMaterializer = FlowMaterializer()
>>
>>   FlowFrom[Int]
>> .withSource(ThunkSource(() => Some(1)))
>> .map(x => x)
>> .withSink(BlackholeSink)
>> .run
>> }
>>
>> And then got this error: java.lang.ClassCastException: scala.Some cannot 
>> be cast to java.lang.Integer
>>
>> What i'm doing wrong? //Akka-stream version - 0.7
>>
>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
> 

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] Control flow question

2014-10-15 Thread Boris Lopukhov
I need to write a simple application that reads rows from the table of the 
postgres, indexes them in elasticsearch and if successful records in 
postgres that data has been indexed. Rows have sizes of about 1 Kbyte to 
100MB. I'm trying to solve this with the akka-stream:

  FlowFrom(messagePublisher)
.mapFuture(elasticService.indexMessage)
.withSink(messagesSink)
.run



Question: What is the best way to do a flow restriction for rows to 
retrieve from the database so that the summary size of the in-process rows 
does not exceed a certain value?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] Control flow question

2014-10-15 Thread Boris Lopukhov
> What do you do if you have a row in the database that exceeds that max 
size? You never read it?

I can guarantee that the size of one row does not exceed the certain maxsize

> Can you write a SQL query that returns all rows up to a certain maxsize?

I think so, but I'm going to read rows sequentially one row per a sql 
request

среда, 15 октября 2014 г., 13:48:15 UTC+4 пользователь √ написал:
>
> What do you do if you have a row in the database that exceeds that max 
> size? You never read it? Can you write a SQL query that returns all rows up 
> to a certain maxsize?
> On Oct 15, 2014 11:40 AM, "Boris Lopukhov" <89b...@gmail.com > 
> wrote:
>
>> I need to write a simple application that reads rows from the table of 
>> the postgres, indexes them in elasticsearch and if successful records in 
>> postgres that data has been indexed. Rows have sizes of about 1 Kbyte to 
>> 100MB. I'm trying to solve this with the akka-stream:
>>
>>   FlowFrom(messagePublisher)
>> .mapFuture(elasticService.indexMessage)
>> .withSink(messagesSink)
>> .run
>>
>>
>>
>> Question: What is the best way to do a flow restriction for rows to 
>> retrieve from the database so that the summary size of the in-process rows 
>> does not exceed a certain value?
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Merge different count of sources

2014-11-13 Thread Boris Lopukhov
Hi everyone!

I have one or a few databases from which I want to download and process 
data.
I would like to merge all sources and work with result flow:

  val sources = config.getConfigList("database.nodes").map(createSource)
  val merge = Merge[Message]
  val mergedFlow = Flow[Message]
  sources.map(_ ~> merge)
  merge ~> mergedFlow



but it does not work because Merge must have at least two incoming edge.
What is the cause of this constraint and how i can work around it?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Merge different count of sources

2014-11-13 Thread Boris Lopukhov
Then, I want to create the mergedSource:

  val mergedSource = sources match {
  case List(source) => source
  case manySources =>
val merge = Merge[Message]
sources.map(_ ~> merge)
merge
  }



But the general type of a Source and a Merge is an Object. 
Thereby I can not create the mergedSource separately from other code. I 
need to do like:

  def createMergedSource(sources: List[Source[Message]])(implicit builder : 
FlowGraphBuilder) = {
val merge = Merge[Message]
sources.map(_ ~> merge)
merge
  }

  sources match {
case List(source) =>
  source ~> doSomethink1 ~> doSomethink2 ~> .. ~> doSomethinkN ~> out
case manySources =>  
  createMergedSource(manySources) ~> doSomethink1 ~> doSomethink2 ~> .. 
~> doSomethinkN ~> out



code becomes more difficult and contains the copy-paste

четверг, 13 ноября 2014 г., 14:16:12 UTC+4 пользователь √ написал:
>
> Skip the merge if you only have 1 source?
>
> On Thu, Nov 13, 2014 at 10:34 AM, Boris Lopukhov <89b...@gmail.com 
> > wrote:
>
>> Hi everyone!
>>
>> I have one or a few databases from which I want to download and process 
>> data.
>> I would like to merge all sources and work with result flow:
>>
>>   val sources = config.getConfigList("database.nodes").map(createSource)
>>   val merge = Merge[Message]
>>   val mergedFlow = Flow[Message]
>>   sources.map(_ ~> merge)
>>   merge ~> mergedFlow
>>
>>
>>
>> but it does not work because Merge must have at least two incoming edge.
>> What is the cause of this constraint and how i can work around it?
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>  

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Merge different count of sources

2014-11-13 Thread Boris Lopukhov
Thanks for response, it looks good to me!

четверг, 13 ноября 2014 г., 18:36:05 UTC+4 пользователь √ написал:
>
> Hi Boris,
>
> What about something like:
>
> implicit class ListSourceMerge[T](val sources: List[Source[T]]) extends 
> AnyVal {
>   def merge(): Source[T] =
> sources match {
>   case Nil => Source.empty[T]
>   case List(source) => source
>   case sources =>
> Source[T]() { implicit b =>
>   val out = UndefinedSink[T]
>   val merge = Merge[T]
>   sources.foreach { _ ~> merge }
>   merge ~> out
>   out
> }
> }
> }
>
> And then you can import that one and write:
>
> val mergedSource = sources.merge()
>
> On Thu, Nov 13, 2014 at 3:30 PM, Boris Lopukhov <89b...@gmail.com 
> > wrote:
>
>> Then, I want to create the mergedSource:
>>
>>   val mergedSource = sources match {
>>   case List(source) => source
>>   case manySources =>
>> val merge = Merge[Message]
>> sources.map(_ ~> merge)
>> merge
>>   }
>>
>>
>>
>> But the general type of a Source and a Merge is an Object. 
>> Thereby I can not create the mergedSource separately from other code. I 
>> need to do like:
>>
>>   def createMergedSource(sources: List[Source[Message]])(implicit 
>> builder : FlowGraphBuilder) = {
>> val merge = Merge[Message]
>> sources.map(_ ~> merge)
>> merge
>>   }
>>
>>   sources match {
>> case List(source) =>
>>   source ~> doSomethink1 ~> doSomethink2 ~> .. ~> doSomethinkN ~> out
>> case manySources =>  
>>   createMergedSource(manySources) ~> doSomethink1 ~> doSomethink2 ~> 
>> .. ~> doSomethinkN ~> out
>>
>>
>>
>> code becomes more difficult and contains the copy-paste
>>
>> четверг, 13 ноября 2014 г., 14:16:12 UTC+4 пользователь √ написал:
>>>
>>> Skip the merge if you only have 1 source?
>>>
>>> On Thu, Nov 13, 2014 at 10:34 AM, Boris Lopukhov <89b...@gmail.com> 
>>> wrote:
>>>
>>>> Hi everyone!
>>>>
>>>> I have one or a few databases from which I want to download and process 
>>>> data.
>>>> I would like to merge all sources and work with result flow:
>>>>
>>>>   val sources = config.getConfigList("database.nodes").map(createSource
>>>> )
>>>>   val merge = Merge[Message]
>>>>   val mergedFlow = Flow[Message]
>>>>   sources.map(_ ~> merge)
>>>>   merge ~> mergedFlow
>>>>
>>>>
>>>>
>>>> but it does not work because Merge must have at least two incoming edge.
>>>> What is the cause of this constraint and how i can work around it?
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>  
>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>  

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] ActorPublisher-based Source

2014-11-17 Thread Boris Lopukhov
Hi everyone!

I am trying to create a stream with an ActorPublisher:

class SourceActor extends ActorPublisher[List[Int]] {
  import akka.stream.actor.ActorPublisherMessage.Request

  var list = List.range(1, 21).grouped(5).toList // for example

  def receive = {
case Request(elems) => while (totalDemand > 0 && isActive) {
  list match {
case Nil =>
  onComplete()
  self ! PoisonPill
case head :: tail =>
  list = tail
  onNext(head)
  }
}
  }
}

val sourceActor = system.actorOf(Props[SourceActor])
val source = Source(ActorPublisher[List[Int]](sourceActor))
val out = ForeachSink(println)
source.mapConcat{x => x}.runWith(out)

I expect to see numbers from 1 to 20, but random printed from 1 to 17~19
As I understand, it occur due to the stream complete before all elements 
are processed. How can I implement an ActorPublisher properly and escape 
this problem?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] ActorPublisher-based Source

2014-11-18 Thread Boris Lopukhov
 

I have isolated the problem, it seems that ActorPublisher is not the cause:

  val list = List.range(1, 21).grouped(5).toList
  Source(list).mapConcat(x => x).runWith(ForeachSink(println))

expectation:  1..20
reality: 1..16


понедельник, 17 ноября 2014 г., 19:46:58 UTC+4 пользователь Boris Lopukhov 
написал:
>
> Hi everyone!
>
> I am trying to create a stream with an ActorPublisher:
>
> class SourceActor extends ActorPublisher[List[Int]] {
>   import akka.stream.actor.ActorPublisherMessage.Request
>
>   var list = List.range(1, 21).grouped(5).toList // for example
>
>   def receive = {
> case Request(elems) => while (totalDemand > 0 && isActive) {
>   list match {
> case Nil =>
>   onComplete()
>   self ! PoisonPill
> case head :: tail =>
>   list = tail
>   onNext(head)
>   }
> }
>   }
> }
>
> val sourceActor = system.actorOf(Props[SourceActor])
> val source = Source(ActorPublisher[List[Int]](sourceActor))
> val out = ForeachSink(println)
> source.mapConcat{x => x}.runWith(out)
>
> I expect to see numbers from 1 to 20, but random printed from 1 to 17~19
> As I understand, it occur due to the stream complete before all elements 
> are processed. How can I implement an ActorPublisher properly and escape 
> this problem?
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-streams] ActorPublisher-based Source

2014-11-18 Thread Boris Lopukhov
No, the actor system is alive. Additionally, i have repeated that code many 
times with the same result:

for (i <-  1 to 10) {
  Source(list).mapConcat(x => x).runWith(ForeachSink(println))
}



вторник, 18 ноября 2014 г., 16:29:08 UTC+4 пользователь Patrik Nordwall 
написал:
>
> Do you shutdown the actor system prematurely?
> /Patrik
>
> On Tue, Nov 18, 2014 at 1:24 PM, Boris Lopukhov <89b...@gmail.com 
> > wrote:
>
>> I have isolated the problem, it seems that ActorPublisher is not the 
>> cause:
>>
>>   val list = List.range(1, 21).grouped(5).toList
>>   Source(list).mapConcat(x => x).runWith(ForeachSink(println))
>>
>> expectation:  1..20
>> reality: 1..16
>>
>>
>> понедельник, 17 ноября 2014 г., 19:46:58 UTC+4 пользователь Boris 
>> Lopukhov написал:
>>
>>> Hi everyone!
>>>
>>> I am trying to create a stream with an ActorPublisher:
>>>
>>> class SourceActor extends ActorPublisher[List[Int]] {
>>>   import akka.stream.actor.ActorPublisherMessage.Request
>>>
>>>   var list = List.range(1, 21).grouped(5).toList // for example
>>>
>>>   def receive = {
>>> case Request(elems) => while (totalDemand > 0 && isActive) {
>>>   list match {
>>> case Nil =>
>>>   onComplete()
>>>   self ! PoisonPill
>>> case head :: tail =>
>>>   list = tail
>>>   onNext(head)
>>>   }
>>> }
>>>   }
>>> }
>>>
>>> val sourceActor = system.actorOf(Props[SourceActor])
>>> val source = Source(ActorPublisher[List[Int]](sourceActor))
>>> val out = ForeachSink(println)
>>> source.mapConcat{x => x}.runWith(out)
>>>
>>> I expect to see numbers from 1 to 20, but random printed from 1 to 17~19
>>> As I understand, it occur due to the stream complete before all elements 
>>> are processed. How can I implement an ActorPublisher properly and escape 
>>> this problem?
>>>
>>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
> 

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-streams] ActorPublisher-based Source

2014-11-19 Thread Boris Lopukhov
Using transform instead of the mapConcat worked well for me:

val list = List.range(1, 21).grouped(5).toList
Source(list).transform("myMapConcat", () => new Transformer[List[Int], Int] 
{
  override def onNext(list: List[Int]) = list
}).runWith(ForeachSink(println))

result: 1..20

May this be a bug in mapConcat? I am using version 0.11

вторник, 18 ноября 2014 г., 17:29:54 UTC+4 пользователь Boris Lopukhov 
написал:
>
> No, the actor system is alive. Additionally, i have repeated that code 
> many times with the same result:
>
> for (i <-  1 to 10) {
>   Source(list).mapConcat(x => x).runWith(ForeachSink(println))
> }
>
>
>
> вторник, 18 ноября 2014 г., 16:29:08 UTC+4 пользователь Patrik Nordwall 
> написал:
>>
>> Do you shutdown the actor system prematurely?
>> /Patrik
>>
>> On Tue, Nov 18, 2014 at 1:24 PM, Boris Lopukhov <89b...@gmail.com> wrote:
>>
>>> I have isolated the problem, it seems that ActorPublisher is not the 
>>> cause:
>>>
>>>   val list = List.range(1, 21).grouped(5).toList
>>>   Source(list).mapConcat(x => x).runWith(ForeachSink(println))
>>>
>>> expectation:  1..20
>>> reality: 1..16
>>>
>>>
>>> понедельник, 17 ноября 2014 г., 19:46:58 UTC+4 пользователь Boris 
>>> Lopukhov написал:
>>>
>>>> Hi everyone!
>>>>
>>>> I am trying to create a stream with an ActorPublisher:
>>>>
>>>> class SourceActor extends ActorPublisher[List[Int]] {
>>>>   import akka.stream.actor.ActorPublisherMessage.Request
>>>>
>>>>   var list = List.range(1, 21).grouped(5).toList // for example
>>>>
>>>>   def receive = {
>>>> case Request(elems) => while (totalDemand > 0 && isActive) {
>>>>   list match {
>>>> case Nil =>
>>>>   onComplete()
>>>>   self ! PoisonPill
>>>> case head :: tail =>
>>>>   list = tail
>>>>   onNext(head)
>>>>   }
>>>> }
>>>>   }
>>>> }
>>>>
>>>> val sourceActor = system.actorOf(Props[SourceActor])
>>>> val source = Source(ActorPublisher[List[Int]](sourceActor))
>>>> val out = ForeachSink(println)
>>>> source.mapConcat{x => x}.runWith(out)
>>>>
>>>> I expect to see numbers from 1 to 20, but random printed from 1 to 17~19
>>>> As I understand, it occur due to the stream complete before all 
>>>> elements are processed. How can I implement an ActorPublisher properly and 
>>>> escape this problem?
>>>>
>>>>  -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> -- 
>>
>> Patrik Nordwall
>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>> Twitter: @patriknw
>>
>> 

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-streams] ActorPublisher-based Source

2014-11-19 Thread Boris Lopukhov
I created new issue https://github.com/akka/akka/issues/16338

Thanks for response!


среда, 19 ноября 2014 г., 18:48:11 UTC+4 пользователь Patrik Nordwall 
написал:
>
> Boris, that is a bug in mapConcat. Please create an issue 
> <https://github.com/akka/akka/issues>, and I will fix it immediately.
>
> Thanks for reporting.
>
> Cheers,
> Patrik
>
> On Wed, Nov 19, 2014 at 1:30 PM, Boris Lopukhov <89b...@gmail.com 
> > wrote:
>
>> Using transform instead of the mapConcat worked well for me:
>>
>> val list = List.range(1, 21).grouped(5).toList
>> Source(list).transform("myMapConcat", () => new Transformer[List[Int], 
>> Int] {
>>   override def onNext(list: List[Int]) = list
>> }).runWith(ForeachSink(println))
>>
>> result: 1..20
>>
>> May this be a bug in mapConcat? I am using version 0.11
>>
>> вторник, 18 ноября 2014 г., 17:29:54 UTC+4 пользователь Boris Lopukhov 
>> написал:
>>
>>> No, the actor system is alive. Additionally, i have repeated that code 
>>> many times with the same result:
>>>
>>> for (i <-  1 to 10) {
>>>   Source(list).mapConcat(x => x).runWith(ForeachSink(println))
>>> }
>>>
>>>
>>>
>>> вторник, 18 ноября 2014 г., 16:29:08 UTC+4 пользователь Patrik Nordwall 
>>> написал:
>>>>
>>>> Do you shutdown the actor system prematurely?
>>>> /Patrik
>>>>
>>>> On Tue, Nov 18, 2014 at 1:24 PM, Boris Lopukhov <89b...@gmail.com> 
>>>> wrote:
>>>>
>>>>> I have isolated the problem, it seems that ActorPublisher is not the 
>>>>> cause:
>>>>>
>>>>>   val list = List.range(1, 21).grouped(5).toList
>>>>>   Source(list).mapConcat(x => x).runWith(ForeachSink(println))
>>>>>
>>>>> expectation:  1..20
>>>>> reality: 1..16
>>>>>
>>>>>
>>>>> понедельник, 17 ноября 2014 г., 19:46:58 UTC+4 пользователь Boris 
>>>>> Lopukhov написал:
>>>>>
>>>>>> Hi everyone!
>>>>>>
>>>>>> I am trying to create a stream with an ActorPublisher:
>>>>>>
>>>>>> class SourceActor extends ActorPublisher[List[Int]] {
>>>>>>   import akka.stream.actor.ActorPublisherMessage.Request
>>>>>>
>>>>>>   var list = List.range(1, 21).grouped(5).toList // for example
>>>>>>
>>>>>>   def receive = {
>>>>>> case Request(elems) => while (totalDemand > 0 && isActive) {
>>>>>>   list match {
>>>>>> case Nil =>
>>>>>>   onComplete()
>>>>>>   self ! PoisonPill
>>>>>> case head :: tail =>
>>>>>>   list = tail
>>>>>>   onNext(head)
>>>>>>   }
>>>>>> }
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> val sourceActor = system.actorOf(Props[SourceActor])
>>>>>> val source = Source(ActorPublisher[List[Int]](sourceActor))
>>>>>> val out = ForeachSink(println)
>>>>>> source.mapConcat{x => x}.runWith(out)
>>>>>>
>>>>>> I expect to see numbers from 1 to 20, but random printed from 1 to 
>>>>>> 17~19
>>>>>> As I understand, it occur due to the stream complete before all 
>>>>>> elements are processed. How can I implement an ActorPublisher properly 
>>>>>> and 
>>>>>> escape this problem?
>>>>>>
>>>>>>  -- 
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>> current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>> group/akka-user
>>>>> --- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>&

[akka-user] [akka-stream] Periodic source of sources

2015-01-22 Thread Boris Lopukhov
Hi all!

How can i do a Source[Source[String]] that periodically emit the 
Source[String] with the specified interval, but only if the previous 
Source[String] is complete?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] Periodic source of sources

2015-01-27 Thread Boris Lopukhov

>
> Since you do not want to produce ticks at all if the previous source is 
> still not complete, I think you will have to fallback to ActorPublisher and 
> implement it.
>

Thanks, i did so

Can you specify this further? Also I would be happy to know the actual 
> use-case because nesting streams is usually a road to much pain if not done 
> carefully so I would investigate first if there is a >graph alternative for 
> the use-case.
>

There are an infinite source and a source that should be periodiclly 
runnnig with interval. Both sources should be processed in the one stream 
for the properly back pressure. For this purpose, i represent periodiclly 
running source as infinite source. I implemented that 
<https://gist.github.com/brs-lphv/456a22a3d64ecb5002c6#file-periodicsourcerunner-scala>
 
with the PublisherActor as recommended Martynas and use like:

  val source = PeriodicSourceRunner(
initialDelay = 1.millis,
interval = period,
createSource = () => createFullscanSource
).flatten(FlattenStrategy.concat)



пятница, 23 января 2015 г., 20:52:10 UTC+4 пользователь drewhk написал:
>
>
>
> On Fri, Jan 23, 2015 at 5:45 PM, Martynas Mickevičius <
> martynas.m...@typesafe.com > wrote:
>
>> There was a discussion on a similar but not completely exact topic that 
>> you described here <https://github.com/akka/akka/issues/16454>.
>>
>
> umm, no, that is slightly different if I understood the proposal correctly.
>  
>
>>
>> Since you do not want to produce ticks at all if the previous source is 
>> still not complete, I think you will have to fallback to ActorPublisher and 
>> implement it.
>>
>> On Thu, Jan 22, 2015 at 9:11 PM, Boris Lopukhov <89b...@gmail.com 
>> > wrote:
>>
>>> Hi all!
>>>
>>> How can i do a Source[Source[String]] that periodically emit the 
>>> Source[String] with the specified interval, but only if the previous 
>>> Source[String] is complete?
>>>
>>
> Can you specify this further? Also I would be happy to know the actual 
> use-case because nesting streams is usually a road to much pain if not done 
> carefully so I would investigate first if there is a graph alternative for 
> the use-case.
>
> -Endre
>
>  
>
>>
>>>  -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com .
>>> To post to this group, send email to akka...@googlegroups.com 
>>> .
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> -- 
>> Martynas Mickevičius
>> Typesafe <http://typesafe.com/> – Reactive 
>> <http://www.reactivemanifesto.org/> Apps on the JVM
>>  
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] The mapAsync and the mapAsyncUnordered an unexpected behavior

2015-02-27 Thread Boris Lopukhov
I have a simple actor publisher:

class TestActor extends ActorPublisher[Int] {

  println("RUN")

  def receive = {
case Request(elements) => while (totalDemand > 0 && isActive) {
  onNext(1)
}
case Cancel => println("CANCEL")
  }
}

and i found a strange feature:

  Source[Int](Props[TestActor])
.map { x => throw new Exception(); x }
.runForeach { x => }

printed RUN and CANCEL

  Source[Int](Props[TestActor])
.mapAsync(Future.successful)
.map { x => throw new Exception(); x }
.runForeach { x => }

printed only RUN 

  Source[Int](Props[TestActor])
.mapAsyncUnordered(Future.successful)
.map { x => throw new Exception(); x }
.runForeach { x => }
 
throwed java.lang.OutOfMemoryError

i use "1.0-M3" version of akka-streams

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] The mapAsync and the mapAsyncUnordered an unexpected behavior

2015-02-27 Thread Boris Lopukhov
Thanks for response!

I created issue https://github.com/akka/akka/issues/16959

пятница, 27 февраля 2015 г., 13:54:35 UTC+4 пользователь drewhk написал:
>
> Hi Boris,
>
> On Fri, Feb 27, 2015 at 10:41 AM, Boris Lopukhov <89b...@gmail.com 
> > wrote:
>
>> I have a simple actor publisher:
>>
>> class TestActor extends ActorPublisher[Int] {
>>
>>   println("RUN")
>>
>>   def receive = {
>> case Request(elements) => while (totalDemand > 0 && isActive) {
>>   onNext(1)
>> }
>> case Cancel => println("CANCEL")
>>   }
>> }
>>
>> and i found a strange feature:
>>
>>   Source[Int](Props[TestActor])
>> .map { x => throw new Exception(); x }
>> .runForeach { x => }
>>
>> printed RUN and CANCEL
>>
>
> This is itself fine, you get the error in the Future returned by 
> runForeach.
>  
>
>>
>>   Source[Int](Props[TestActor])
>> .mapAsync(Future.successful)
>> .map { x => throw new Exception(); x }
>> .runForeach { x => }
>>
>> printed only RUN 
>>
>
> This is not fine though.
>  
>
>>
>>   Source[Int](Props[TestActor])
>> .mapAsyncUnordered(Future.successful)
>> .map { x => throw new Exception(); x }
>> .runForeach { x => }
>>  
>> throwed java.lang.OutOfMemoryError
>>
>
> This is also not fine. I suspect there is a problem handling already 
> completed Futures. You should file a ticket, this seems like a bug. If you 
> could also try using a not already completed Future that would help.
>
> -Endre
>  
>
>>
>> i use "1.0-M3" version of akka-streams
>>
>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.