On Thursday, 14 July 2016 15:13:57 UTC+1, Daniel Stoner wrote:
>
> Recently I spotted a great example of how to use the Source.queue feature 
> in Streams to pre-materialise a flow and then pass events into it 
> independently.
> http://stackoverflow.com/a/33415214/5142410
>
> The examples utilising Actors were tempting but would over-complicate my 
> use case - which is to throttle writes to a database in a custom 
> persistence Journal implementation.
>
> Using Source.queue for the life of me I cannot work out how to get the 
> SourceQueue from which to then 'offer' out of this flow within the Java DSL.
>
> Scala example was:
>
> val queue = Source.queue(bufferSize, overflowStrategy)
>                   .filter(!_.raining)
>                   .runForeach(println)
>
> queue.offer(Weather("02139", 32.0, true))
>
>
> My Java line for line conversion is:
>
>> CompletionStage<Done> clearlyNotAQueue = 
>>                  Source.<Integer>queue(5, OverflowStrategy.dropHead())
>>                 .filter(msg -> msg.equals(5))
>>                 .runForeach(System.out::println, materializer)
>
>
> However this returns a CompletionStage<Done> representing the runForEach's 
> completion without result.
>
> A pointer in the right direction would be hugely appreciated.
>

Given a source of type *Source<Out, M1>* and a sink of type *Sink<In, M2>*, 
 source.runWith (sink, mat) returns an *M2*, but *sink.runWith *(source, 
mat) returns an M1.  Therefore, something like the following (untested) 
should do what you want:

SourceQueue<Integer> queue = Sink.foreach (msg -> System.out::println).runWith 
(
  Source.<Integer>queue (5, OverflowStrategy.dropHead ()).filter (msg -> 
msg.equals (5)),
  materializer 
);



 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to