I think one of the keys here is that the materialized value (the 
Future[Long] in this case) is not an element, per se, so it won't get 
streamed inline.
In other words, don't expect to do a broadcast/zip to get the materialized 
values.
You'll need to capture the materialized value when the stream is 
materialized, not when the flow is constructed.

I don't know your exact logic, but here's a quick stab at your problem in 
the form of a ScalaTest. I'm creating the sink as a separate step and I 
added some verbose typing, etc. to illustrate and hopefully make things 
clear. Of course, imports and such omitted for brevity.

Note the use of the materialized value combiner function (m1, m2) => (m1, 
m2). This will get you both materialized values out of the sink.

  def outputSink(outStream1: OutputStream, outStream2: OutputStream): 
Sink[ByteString, (Future[Long], Future[Long])] = {
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val outSink1 = OutputStreamSink(() => outStream1)
    val outSink2 = OutputStreamSink(() => outStream2)

    Sink(outSink1, outSink2)((m1, m2) => (m1, m2)) { implicit builder =>
      (out1, out2) => {
        val bcast = builder.add(Broadcast[ByteString](2))

        bcast ~> out1
        bcast ~> out2

        (bcast.in)
      }
    }
  }

  "Flow" should "produce materialized values" in {
    val data = "Some String"
    val source = InputStreamSource(() => new 
ByteArrayInputStream(data.getBytes))
    val outStream1 = new ByteArrayOutputStream()
    val outStream2 = new ByteArrayOutputStream()
    val sink = outputSink(outStream1, outStream2)

    val runnable: RunnableFlow[(Future[Long], Future[Long])] = 
source.toMat(sink)(Keep.right)

    val (size1, size2) = runnable.run()

    whenReady(size1) { size =>
      size shouldBe data.getBytes.length
    }
    whenReady(size2) { size =>
      size shouldBe data.getBytes.length
    }

Regards,
Lance

On Saturday, June 20, 2015 at 8:13:08 AM UTC-4, Michael Hamrah wrote:
>
> I'm working with an OutputStreamSink (
> http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/index.html#akka.stream.io.OutputStreamSink$)
>  
> which materializes a Future[Long]. I'd like to use an OutputStreamSink as 
> part of a flow, but I can't get the types to line up.
>
> Ideally my graph will look something like this; it splits an input stream, 
> zips the result, and checker checks to make sure the future[long]'s are 
> equal.
>
> ```
> in ~> broadcast ~> outputsink1 ~> zip.in0 ~> checker
>     ~> broadcast ~> outputsink2 ~> zip.in1
> ```
>
> any ideas? It seems like I need to rewrite OutputStreamSink from a Sink to 
> a Flow, or use a SubscriberActor with a PublisherActor, or possibly write 
> some sort of PushPullStage. Seems like there should be an easy way to 
> "lift" a sink with a materialized value to a flow stage.
>
> Thanks,
>
> Mike
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to