That's awesome thanks!

Here's the final service:

    val rawFileSink = FileIO.toFile(file)
    val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
    val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

    val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, 
thumbnailFileSink, watermarkedFileSink)((_, _, _)) {
      implicit builder => (rawSink, thumbSink, waterSink) => {
        val streamFan = builder.add(Broadcast[ByteString](2))
        val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))

        streamFan.out(0) ~> rawSink
        streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in

        byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink
        byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink.in

        SinkShape(streamFan.in)
      }
    })

    graph.mapMaterializedValue[Future[Try[Done]]](fs => 
Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done)))

After which, you can simply do this in the play controller:
Accumulator(theSink).map(Right.apply)

On Tuesday, 10 May 2016 09:17:24 UTC+1, Tom Peck wrote:
>
> I'm trying to integrate an akka streams based flow in to my Play 2.5 app. 
>  The idea is that you can stream in a photo, then have it written to disk 
> as the raw file, a thumbnailed version and a watermarked version.
>
>
> I managed to get this working using a graph something like this:
>
>
>     val byteAccumulator = Flow[ByteString].fold(new 
> ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
>
>                                         .map(_.result().toArray)
>
>
>     def toByteArray = Flow[ByteString].map(b => b.toArray)
>
>
>     val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
>
>       import GraphDSL.Implicits._
>
>       val streamFan = builder.add(Broadcast[ByteString](3))
>
>       val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
>
>       val output = builder.add(Flow[ByteString].map(x => Success(Done)))
>
>
>       val rawFileSink = FileIO.toFile(file)
>
>       val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
>
>       val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
>
>
>       streamFan.out(0) ~> rawFileSink
>
>       streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
>
>       streamFan.out(2) ~> output.in
>
>
>       byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
>
>       byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
>
>
>       FlowShape(streamFan.in, output.out)
>
>     })
>
>
>     graph
>
>   }
>
>
> Then I wire it in to my play controller using an accumulator like this:
>
>
>     val sink = Sink.head[Try[Done]]
>
>
>     val photoStorageParser = BodyParser { req =>
>
>          Accumulator(sink).through(graph).map(Right.apply)
>
>     }
>
>
>
> The problem is that my two processed file sinks aren't completing and I'm 
> getting zero sizes for both processed files, but not the raw one.  My 
> theory is that the accumulator is only waiting on one of the outputs of my 
> fan out, so when the input stream completes and my byteAccumulator spits 
> out the complete file, by the time the processing is finished play has got 
> the materialized value from the output.
>
>
> So, my questions are:  
>
> Am I on the right track with this as far as my approach goes?
>
> What is the expected behaviour for running a graph like this?
>
> How can I bring all my sinks together to form one final sink?
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to