Hi,

I am trying to create my own akka streams shape with several Inlets and 
Outlets. I have written following code: 

package kernel.modeller.workers.streamFinder.generic

import akka.stream.{Shape, Outlet, Inlet}
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable

object FanShape {
  sealed trait Init[_] {
    def inlets: immutable.Seq[Inlet[_]]
    def outlets: immutable.Seq[Outlet[_]]
    def name: String
  }
  final case class Name[_](override val name: String) extends Init[Any] {
    override def inlets: immutable.Seq[Inlet[_]] = Nil
    override def outlets: immutable.Seq[Outlet[_]] = Nil
  }
  final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
    override def name: String = "FanShape"
  }
}

abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
Iterator[Outlet[_]], _name: String) extends Shape {
  
  import FanShape._

  def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
init.outlets.iterator, init.name)

  final override def outlets: immutable.Seq[Outlet[_]] = _outlets
  final override def inlets: immutable.Seq[Inlet[_]] = _inlets

  private var _outlets: Vector[Outlet[_]] = Vector.empty
  private var _inlets: Vector[Inlet[_]] = Vector.empty

  protected def newOutlet[T](name: String): Outlet[T] = {
    val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
Outlet[T](s"${_name}.$name")
    _outlets :+= p
    p
  }

  protected def newInlet[T](name: String): Inlet[T] = {
    val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
Inlet[T](s"${_name}.$name")
    _inlets :+= p
    p
  }

  protected def construct(init: Init[_]): FanShape[_]

  def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
outlets.map(_.carbonCopy())))
  final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
immutable.Seq[Outlet[_]]): FanShape[_] = {
    require(outlets.size == _outlets.size, s"proposed outlets 
[${outlets.mkString(", ")}] do not fit FanShape")
    require(inlets.size == _inlets.size, s"proposed inlects 
[${inlets.mkString(", ")}] do not fit FanShape")
    construct(Ports(inlets, outlets))
  }
}

object UniformFanShape {
  def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): 
UniformFanShape[I, O] =
    new UniformFanShape(inlets.size, outlets.size, 
FanShape.Ports(inlets.toList, outlets.toList))
}

class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_]) extends 
FanShape(_init) {
  def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
  def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
  def this(inlets: Array[Inlet[I]], outlets: Array[Outlet[O]]) = 
this(inlets.size, outlets.size, FanShape.Ports(inlets.toList, outlets.toList))
  override protected def construct(init: FanShape.Init[_]): FanShape[_] = new 
UniformFanShape(n, m, init)
  override def deepCopy(): UniformFanShape[I, O] = 
super.deepCopy().asInstanceOf[UniformFanShape[I, O]]

  val inArray: Array[Inlet[I @uncheckedVariance]] = Array.tabulate(n)(i ⇒ 
newInlet[I](s"in$i"))
  def in(n: Int): Inlet[I @uncheckedVariance] = inArray(n)

  val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(m)(j ⇒ 
newOutlet[O](s"out$j"))
  def out(m: Int): Outlet[O @uncheckedVariance] = outArray(m)
}


This code allows creating graph, however, it is not possible to process 
messages with it. It doesn't call handlers for messages, they get stuck 
somewhere. 
Could you please help me to fix it? 

PS: I am not an expert in Scala.

Thank you in advance!

Regards,
Sergey

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to