Hi Tom,

If I understood, your layout looks like, this (slightly simplified)

bytes --> broadcast --> map(x => Success(Done)) --> Sink.head
              |
              +----> image
              |
              +----> watermarked
              |
              +----> thumbnail

The first issue that I see with the above layout is that the map stage will
emit a Success(Done) as the very first byte have passed through it.
However, since the graph is concurrent, this means that that byte might
have not reached any of the Sinks yet. I don't know what are the exact
semantics of Play here, if you return an early completed future while you
have not consumed the full body, so I don't know how that affects you.

What you really want is the futures returned by the different Sinks, since
only they can report when a file has been fully written, flushed and closed.

You will need to do something like this (pseudocode and simplified
somewhat):

val multiImageSink:
  Source[ByteString, (Future[Done], Future[Done], Future[Done])] =
  Flow.fromGraph(GraphDSL.create(imgSink, wtrmrkSink, thumbSink) {
    implicit b => (img, watermark, thumb) =>
    val bcast = ...

    bcast ~> img
    bcast ~> watermark
    bcast ~> thumb

    SourceShape(bcast.in)
})

The above code creates a Sink, which ready ByteStrings and materializes to
a Tuple3 of Futures that signal once the given file has been written. You
can combine these Futures into one to be able to plug it in Play probably.

-Endre

On Tue, May 10, 2016 at 10:14 AM, Tom Peck <adifferentb...@gmail.com> wrote:

> I'm trying to integrate an akka streams based flow in to my Play 2.5 app.
> The idea is that you can stream in a photo, then have it written to disk as
> the raw file, a thumbnailed version and a watermarked version.
>
>
> I managed to get this working using a graph something like this:
>
>
>     val byteAccumulator = Flow[ByteString].fold(new
> ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
>
>                                         .map(_.result().toArray)
>
>
>     def toByteArray = Flow[ByteString].map(b => b.toArray)
>
>
>     val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
>
>       import GraphDSL.Implicits._
>
>       val streamFan = builder.add(Broadcast[ByteString](3))
>
>       val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
>
>       val output = builder.add(Flow[ByteString].map(x => Success(Done)))
>
>
>       val rawFileSink = FileIO.toFile(file)
>
>       val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
>
>       val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
>
>
>       streamFan.out(0) ~> rawFileSink
>
>       streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
>
>       streamFan.out(2) ~> output.in
>
>
>       byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
>
>       byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
>
>
>       FlowShape(streamFan.in, output.out)
>
>     })
>
>
>     graph
>
>   }
>
>
> Then I wire it in to my play controller using an accumulator like this:
>
>
>     val sink = Sink.head[Try[Done]]
>
>
>     val photoStorageParser = BodyParser { req =>
>
>          Accumulator(sink).through(graph).map(Right.apply)
>
>     }
>
>
>
> The problem is that my two processed file sinks aren't completing and I'm
> getting zero sizes for both processed files, but not the raw one.  My
> theory is that the accumulator is only waiting on one of the outputs of my
> fan out, so when the input stream completes and my byteAccumulator spits
> out the complete file, by the time the processing is finished play has got
> the materialized value from the output.
>
>
> So, my questions are:
>
> Am I on the right track with this as far as my approach goes?
>
> What is the expected behaviour for running a graph like this?
>
> How can I bring all my sinks together to form one final sink?
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to