Adding as many worker instances as I need using builder.add works but one
caveat... this works:

object Balancer {
  def apply[A, B, C](nrOfWorkers: Int, workerGraph: Graph[FlowShape[A, B],
Unit]): Graph[FlowShape[A, B], Unit] =
    FlowGraph.partial() { implicit builder =>
      import FlowGraph.Implicits._

      val broadcast = builder.add(Broadcast[A](nrOfWorkers))
      val merge = builder.add(Merge[B](nrOfWorkers))
      val workers = (0 until nrOfWorkers).map(_ => builder.add(workerGraph))

      for (i <- 0 until nrOfWorkers) {
        broadcast ~> workers(i) ~> merge
      }

      FlowShape(broadcast.in, merge.out)
    }
}

But this doesn't work...

object Balancer {
  def apply[A, B, C](nrOfWorkers: Int, workerGraph: Graph[FlowShape[A, B],
Unit]): Graph[FlowShape[A, B], Unit] =
    FlowGraph.partial() { implicit builder =>
      import FlowGraph.Implicits._

      val broadcast = builder.add(Broadcast[A](nrOfWorkers))
      val merge = builder.add(Merge[B](nrOfWorkers))

      for (i <- 0 until nrOfWorkers) {
        broadcast ~> builder.add(workerGraph) ~> merge
      }

      FlowShape(broadcast.in, merge.out)
    }
}

And the error I get is:

java.lang.IllegalArgumentException: requirement failed: The inlets
[UniformFanOut.in] and outlets [UniformFanIn.out] must correspond to the
inlets [UniformFanOut.in, UniformFanOut.in] and outlets [UniformFanIn.out,
UniformFanIn.out])

2015-03-03 14:08 GMT+00:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> Thank you Endre! Yes, I have seen that example. In my use case, workers
> are also partial graphs and I will use builder.add to import the flow as
> many times as I need and let you know if that did the trick. If I found any
> other problem, I will simplify my use case so I can just use Flows.
>
> 2015-03-03 14:03 GMT+00:00 Endre Varga <endre.va...@typesafe.com>:
>
>> Btw, have you looked at the actual cookbook sample?
>>
>>
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
>>
>> On Tue, Mar 3, 2015 at 2:56 PM, Endre Varga <endre.va...@typesafe.com>
>> wrote:
>>
>>> Hi Luis,
>>>
>>> It should not stuck but throw, but this will not work:
>>>
>>>
>>>>           broadcast.out(i) ~> worker ~> merge.in(i)
>>>>
>>>
>>> You imported worker once, you cannot use it N times. You can either use
>>> builder.add to add as many times as you need (the parametric import you
>>> used only matters if you want to expose the materialized value of the
>>> imported element), but since your worker is a Flow, why don't just accept
>>> it as a Flow instead of Graph[FlowShape[A, B], Unit]?
>>>
>>> It is interesting why it hangs though instead of throwing an exception.
>>>
>>> -Endre
>>>
>>>
>>>
>>>>
>>>>
>>>
>>>> With nrOfWorkers > 1, the creation of this partial graph get stuck
>>>> after the first iteration. Am I missing something? I was expecting this to
>>>> create as many workers as desired.
>>>>
>>>> Regards,
>>>>
>>>> Luis
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ:
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to akka-user+unsubscr...@googlegroups.com.
>>>> To post to this group, send email to akka-user@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>  --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to