Souds good, will try that and report back.
Thanks for your advices.

Le vendredi 27 juin 2014 12:47:26 UTC+2, drewhk a écrit :
>
>
>
>
> On Fri, Jun 27, 2014 at 12:23 PM, Endre Varga <endre...@typesafe.com 
> <javascript:>> wrote:
>
>>
>>
>>
>> On Fri, Jun 27, 2014 at 11:59 AM, benq <benoit....@gmail.com 
>> <javascript:>> wrote:
>>
>>>
>>>
>>> Le vendredi 27 juin 2014 10:01:57 UTC+2, drewhk a écrit :
>>>
>>>> Hi Benoit,
>>>>
>>>> I wasn't proposing ActorProucer to loop back, instead you should 
>>>> encapsulate the scrollId state and corresponding behacior in that 
>>>> ActorProducer -- i.e. you implement your original snippet not in terms of 
>>>> flow, but inside the ActorProducer. 
>>>>
>>>
>>> I understand that, it was clear. It seems my additional question was not 
>>> :(
>>> My initial snippet is a Duct. It takes an input (the query parameters) 
>>> and produces results. It was supposed to be plugged between two other Ducts.
>>>
>>
>> Ah, I thought you only use Duct to be able to loop back. What about using 
>> a Transformer then?
>>
>
> Well, have to correct myself, Transformer cannot work because you have an 
> asynch DB API.
>
> But the solution is actually simple: Create an ActorProducer that 
> enapsulate a result stream for a _given_ query parameter that handles the 
> scrollId. Now this stream basically represents _one_ response stream to one 
> query. You can expose this for example as (dummy code)
>
> def query(q: Query): Producer[Entry]
>
> Now if you have a Flow of queries you can do
>
> queries.map(query(_)).flatten(Concat)
>
> To have a contiguous stream. Although if no contiguity is needed, I would 
> keep it as a stream-of-streams and avoid the concat.
>
> -Endre
>
>>
>> -Endre
>>  
>>
>>>
>>>  
>>>
>>>> Then you can compose that stream with others.
>>>>
>>>
>>> My question is how. How to I give an input to the ActorProducer (the 
>>> query parameters). How do I compose it with other existing Duct (put it 
>>> between two existing Ducts).
>>>  From an ActorProducer, I can get a Flow, which start the computation. 
>>> But not a Duct. Do I miss something obvious?
>>> Is it clearer?
>>>
>>> Benoît
>>>  
>>>
>>>>  
>>>> -Endre
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 9:58 PM, benq <benoit....@gmail.com> wrote:
>>>>
>>>>> Ok, thanks, I see.
>>>>>
>>>>> How should I feed my ActorProducer?
>>>>> It is a step of a Flow.
>>>>>
>>>>> Something like 
>>>>>
>>>>> val myProducer = system.actorOf(...)
>>>>> Flow(ActorProducer[T](myProducer).map...
>>>>> Flow(...).foreach{ myProducer ! _ }  
>>>>>
>>>>> ?
>>>>>
>>>>> Will the back pressure work between the two flows?
>>>>>
>>>>> Benoît
>>>>>
>>>>> Le mercredi 25 juin 2014 21:27:42 UTC+2, Patrik Nordwall a écrit :
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 25, 2014 at 7:11 PM, benq <benoit....@gmail.com> wrote:
>>>>>>
>>>>>>> Is there an example somewhere using an ActorProducer?
>>>>>>>
>>>>>>
>>>>>> Not an example, but a test: https://github.com/akka/
>>>>>> akka/blob/release-2.3-dev/akka-stream/src/test/scala/akka/
>>>>>> stream/actor/ActorProducerSpec.scala
>>>>>>
>>>>>> /Patrik
>>>>>>  
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Le mercredi 25 juin 2014 10:07:53 UTC+2, drewhk a écrit :
>>>>>>>
>>>>>>>> Hi Benoit
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 25, 2014 at 9:49 AM, benq <benoit....@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> Is it possible to feed a Duct (/Flow) with the elements it 
>>>>>>>>> produces, like a loop.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Looping this way is dangerous, in general it usually leads to 
>>>>>>>> element count explosion, but since streams are always bounded, you 
>>>>>>>> will 
>>>>>>>> very likely deadlock instead. While there is a very thin line where 
>>>>>>>> loops 
>>>>>>>> are safe, it is just too easy to break it in unexpected ways.
>>>>>>>>  
>>>>>>>>
>>>>>>>>>
>>>>>>>>> The use case is a Flow of requests to a NoSQL db. Some requests 
>>>>>>>>> return huge result sets that needs to be retrieved in more than one 
>>>>>>>>> request, unsing a scroll, which is similar to a database cursor. The 
>>>>>>>>> noSQL 
>>>>>>>>> server returns pack of n results and a scrollId that need to be used 
>>>>>>>>> to 
>>>>>>>>> retrieve the next n restuls, until the number of results returned is 
>>>>>>>>> 0.
>>>>>>>>>
>>>>>>>>> So, I would like to do something like (pseudo code, no compiler at 
>>>>>>>>> hand):
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>> case class Response(scrollId: String, hits: List[SearchHit])
>>>>>>>>>
>>>>>>>>> val (in, out) =
>>>>>>>>> Duct[Response].
>>>>>>>>> filter(_.hits.size > 0).
>>>>>>>>>
>>>>>>>>
>>>>>>>> Due to this filter, the probability of deadlocking is huge when 
>>>>>>>> elements are actually dropped. 
>>>>>>>>  
>>>>>>>>
>>>>>>>>> mapFuture(client scrollSearch scrollId).
>>>>>>>>>  map(r => Response(r.getScrollId(), r.getHits())).
>>>>>>>>> tee(in).  // This fails at compilation: circular reference. Goal: 
>>>>>>>>> request next scroll...
>>>>>>>>>
>>>>>>>>
>>>>>>>> Even if this would compile, tee waits until there are two 
>>>>>>>> subscribers, unfortunately looping creates a circular dependency of 
>>>>>>>> subscription timings that will probably deadlock in construction time. 
>>>>>>>> Also, there is now no clean termination condition, as nothing will 
>>>>>>>> complete 
>>>>>>>> this loop ever, so even if it would subscribe everything successfully, 
>>>>>>>> it 
>>>>>>>> will never stop the stream even if there are no more elements flowing.
>>>>>>>>  
>>>>>>>>
>>>>>>>>> mapConcat(_.getHits map ....)  // ... while working on already 
>>>>>>>>> available results
>>>>>>>>>  build(materializer).
>>>>>>>>> ---
>>>>>>>>>
>>>>>>>>> Is there a way to implement this?
>>>>>>>>>
>>>>>>>>
>>>>>>>> No, unfortunately no. Even if certain loops just happen to work, it 
>>>>>>>> would very likely break if we change things like subscription timings 
>>>>>>>> (who 
>>>>>>>> subscribes when) although we do not violate the SPI contract.
>>>>>>>>
>>>>>>>> The proper way to implement what you need is to use the 
>>>>>>>> ActorProducer helper class and create a producer on your own that 
>>>>>>>> keeps 
>>>>>>>> track of the scrollId.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Endre
>>>>>>>>  
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks in advance.
>>>>>>>>>
>>>>>>>>> Benoit
>>>>>>>>>
>>>>>>>>> -- 
>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>>>>>> urrent/additional/faq.html
>>>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>>>>>> p/akka-user
>>>>>>>>> --- 
>>>>>>>>> You received this message because you are subscribed to the Google 
>>>>>>>>> Groups "Akka User List" group.
>>>>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>>>>> send an email to akka-user+...@googlegroups.com.
>>>>>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>>>>>>
>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>
>>>>>>>>
>>>>>>>>  -- 
>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>>>> urrent/additional/faq.html
>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>>>> p/akka-user
>>>>>>> --- 
>>>>>>> You received this message because you are subscribed to the Google 
>>>>>>> Groups "Akka User List" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>>> send an email to akka-user+...@googlegroups.com.
>>>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -- 
>>>>>>
>>>>>> Patrik Nordwall
>>>>>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>>>>>> Twitter: @patriknw
>>>>>>
>>>>>>   -- 
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>> current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>> group/akka-user
>>>>> --- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>  -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com <javascript:>.
>>> To post to this group, send email to akka...@googlegroups.com 
>>> <javascript:>.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to