Ok, thanks, I see.

How should I feed my ActorProducer?
It is a step of a Flow.

Something like 

val myProducer = system.actorOf(...)
Flow(ActorProducer[T](myProducer).map...
Flow(...).foreach{ myProducer ! _ }  

?

Will the back pressure work between the two flows?

Benoît

Le mercredi 25 juin 2014 21:27:42 UTC+2, Patrik Nordwall a écrit :
>
>
>
>
> On Wed, Jun 25, 2014 at 7:11 PM, benq <benoit....@gmail.com <javascript:>> 
> wrote:
>
>> Is there an example somewhere using an ActorProducer?
>>
>
> Not an example, but a test: 
> https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/test/scala/akka/stream/actor/ActorProducerSpec.scala
>
> /Patrik
>  
>
>>
>>
>> Le mercredi 25 juin 2014 10:07:53 UTC+2, drewhk a écrit :
>>
>>> Hi Benoit
>>>
>>>
>>> On Wed, Jun 25, 2014 at 9:49 AM, benq <benoit....@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Is it possible to feed a Duct (/Flow) with the elements it produces, 
>>>> like a loop.
>>>>
>>>
>>> Looping this way is dangerous, in general it usually leads to element 
>>> count explosion, but since streams are always bounded, you will very likely 
>>> deadlock instead. While there is a very thin line where loops are safe, it 
>>> is just too easy to break it in unexpected ways.
>>>  
>>>
>>>>
>>>> The use case is a Flow of requests to a NoSQL db. Some requests return 
>>>> huge result sets that needs to be retrieved in more than one request, 
>>>> unsing a scroll, which is similar to a database cursor. The noSQL server 
>>>> returns pack of n results and a scrollId that need to be used to retrieve 
>>>> the next n restuls, until the number of results returned is 0.
>>>>
>>>> So, I would like to do something like (pseudo code, no compiler at 
>>>> hand):
>>>>
>>>> ---
>>>> case class Response(scrollId: String, hits: List[SearchHit])
>>>>
>>>> val (in, out) =
>>>> Duct[Response].
>>>> filter(_.hits.size > 0).
>>>>
>>>
>>> Due to this filter, the probability of deadlocking is huge when elements 
>>> are actually dropped. 
>>>  
>>>
>>>> mapFuture(client scrollSearch scrollId).
>>>>  map(r => Response(r.getScrollId(), r.getHits())).
>>>> tee(in).  // This fails at compilation: circular reference. Goal: 
>>>> request next scroll...
>>>>
>>>
>>> Even if this would compile, tee waits until there are two subscribers, 
>>> unfortunately looping creates a circular dependency of subscription timings 
>>> that will probably deadlock in construction time. Also, there is now no 
>>> clean termination condition, as nothing will complete this loop ever, so 
>>> even if it would subscribe everything successfully, it will never stop the 
>>> stream even if there are no more elements flowing.
>>>  
>>>
>>>> mapConcat(_.getHits map ....)  // ... while working on already 
>>>> available results
>>>>  build(materializer).
>>>> ---
>>>>
>>>> Is there a way to implement this?
>>>>
>>>
>>> No, unfortunately no. Even if certain loops just happen to work, it 
>>> would very likely break if we change things like subscription timings (who 
>>> subscribes when) although we do not violate the SPI contract.
>>>
>>> The proper way to implement what you need is to use the ActorProducer 
>>> helper class and create a producer on your own that keeps track of the 
>>> scrollId.
>>>
>>>
>>> -Endre
>>>  
>>>
>>>>
>>>> Thanks in advance.
>>>>
>>>> Benoit
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
> 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to