Yep, I was trying to make something like that, but my code doesn't work. 
Messages stuck somewhere and I don't know how to fix it due to lack of 
Scala knowledge. You can find the code in my initial message here. 
I took UniformFanOut as example and unsuccessfully tried to replace single 
Inlet with the set of them.
If you could help me to fix it I would be more than happy :)
Thanks!

Regards,
Sergey

среда, 19 октября 2016 г., 11:32:05 UTC+3 пользователь Konrad Malawski 
написал:
>
> Shapes don't need separate java or scala api, it's shared.
>
> You can just subclass a shape and make a class that directly represents 
> your shape.
> If you want AmorphousShape then sure, but please note that it's purpose is 
> to "forget about the types of those".
>
> If you want a well typed one simply extend Shape and fill in the abstract 
> methods - see FlowShape etc for examples how to do this.
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 19 October 2016 at 08:03:16, Sergey Sopin (sopi...@gmail.com 
> <javascript:>) wrote:
>
> Hi, 
>
> Yes, but it seems that I need to create Java API for it, because my app is 
> in Java. 
> I used Inkscape app. to draw the diagram.
>
> Cheers,
> Sergey
>
> среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
> написал: 
>>
>> A custom GraphStage [1] using AmorphousShape is probably the way to go in 
>> this case. 
>>
>> That's a really neat diagram, BTW! What software did you us to create it?
>>
>> Cheers,
>> Rafał
>>
>> [1] 
>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>>
>> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey 
>> Sopin napisał: 
>>>
>>> Hi again,
>>>
>>> I have a very specific case. My flow looks like this one: 
>>>
>>>
>>> <https://lh3.googleusercontent.com/-qrP4yHkVYI8/WAaAO1-q67I/AAAAAAAAAbw/acUT-YaG48k0lo7MkePGv9QVVRkH5L_BACLcB/s1600/Flow.png>
>>>
>>> The idea of multi input/output shape was to redirect messages to a right 
>>> output based on the message data.
>>>
>>> I just learn streams, so maybe you can suggest a better solution?
>>>
>>> Thanks!
>>>
>>>
>>> Cheers, 
>>>
>>> Sergey
>>>
>>>
>>>
>>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
>>> написал: 
>>>>
>>>> It's not clear to me, what are you trying to accomplish. It looks like 
>>>> you are trying to implement AmorphousShape (ie. arbitrary number of open 
>>>> inlets and outlets) on your own, and then a specific variant of it, that 
>>>> has all inlets sharing the same type, and all outlets sharing another 
>>>> type. 
>>>> The "Fan" fragment in the names you used is a bit misleading, since in 
>>>> Akka 
>>>> Stream's own usage of it names like FanIn / FanOut shape mean that such 
>>>> grap has many inlets and single outlet / single inlet many outlets. The 
>>>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
>>>> many blades :) I am wondering what use case you have in mind for your 
>>>> AmorphousShape because the graphs that can be materialized and executed 
>>>> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
>>>> for reusing pieces of functionality, but anything more complex than a 
>>>> BidiShape  seems  rather unwieldy to me. 
>>>>
>>>> My understanding is that Graph's shape should not interfere with 
>>>> message flow, because it's just a canvas with contact points on the 
>>>> perimeter. What matters are the components that you plug into it. Akka 
>>>> just 
>>>> makes sure that you don't leave any of the contact points dangling. This 
>>>> makes me think that the problems with messages getting "stuck" was caused 
>>>> somewhere other than graph shape construction site.
>>>>
>>>> Have you tried inserting probes alon the lines of 
>>>> Flow.alsoTo(Sink.foreach(_ 
>>>> => println("beep!"))) (shooting from the hip here, apologies if it 
>>>> does not compile straight away) into your graph? That could help you 
>>>> locate 
>>>> where the messages are stuck / discarded.
>>>>
>>>> Cheers,
>>>> Rafał
>>>>
>>>> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
>>>> Sergey Sopin napisał: 
>>>>>
>>>>> Hi, 
>>>>>
>>>>> I am trying to create my own akka streams shape with several Inlets 
>>>>> and Outlets. I have written following code: 
>>>>>
>>>>> package kernel.modeller.workers.streamFinder.generic
>>>>>
>>>>> import akka.stream.{Shape, Outlet, Inlet}
>>>>> import scala.annotation.unchecked.uncheckedVariance
>>>>> import scala.collection.immutable
>>>>>
>>>>> object FanShape {
>>>>>   sealed trait Init[_] {
>>>>>     def inlets: immutable.Seq[Inlet[_]]
>>>>>     def outlets: immutable.Seq[Outlet[_]]
>>>>>     def name: String
>>>>>   }
>>>>>   final case class Name[_](override val name: String) extends Init[Any] {
>>>>>     override def inlets: immutable.Seq[Inlet[_]] = Nil
>>>>>     override def outlets: immutable.Seq[Outlet[_]] = Nil
>>>>>   }
>>>>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>>>>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>>>>>     override def name: String = "FanShape"
>>>>>   }
>>>>> }
>>>>>
>>>>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>>>>> Iterator[Outlet[_]], _name: String) extends Shape {
>>>>>   
>>>>>   import FanShape._
>>>>>
>>>>>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
>>>>> init.outlets.iterator, init.name)
>>>>>
>>>>>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>>>>>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>>>>>
>>>>>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>>>>>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>>>>>
>>>>>   protected def newOutlet[T](name: String): Outlet[T] = {
>>>>>     val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
>>>>> Outlet[T](s"${_name}.$name")
>>>>>     _outlets :+= p
>>>>>     p
>>>>>   }
>>>>>
>>>>>   protected def newInlet[T](name: String): Inlet[T] = {
>>>>>     val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
>>>>> Inlet[T](s"${_name}.$name")
>>>>>     _inlets :+= p
>>>>>     p
>>>>>   }
>>>>>
>>>>>   protected def construct(init: Init[_]): FanShape[_]
>>>>>
>>>>>   def deepCopy(): FanShape[_] = 
>>>>> construct(Ports(inlets.map(_.carbonCopy()), outlets.map(_.carbonCopy())))
>>>>>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
>>>>> immutable.Seq[Outlet[_]]): FanShape[_] = {
>>>>>     require(outlets.size == _outlets.size, s"proposed outlets 
>>>>> [${outlets.mkString(", ")}] do not fit FanShape")
>>>>>     require(inlets.size == _inlets.size, s"proposed inlects 
>>>>> [${inlets.mkString(", ")}] do not fit FanShape")
>>>>>     construct(Ports(inlets, outlets))
>>>>>   }
>>>>> }
>>>>>
>>>>> object UniformFanShape {
>>>>>   def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): 
>>>>> UniformFanShape[I, O] =
>>>>>     new UniformFanShape(inlets.size, outlets.size, 
>>>>> FanShape.Ports(inlets.toList, outlets.toList))
>>>>> }
>>>>>
>>>>> class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_]) 
>>>>> extends FanShape(_init) {
>>>>>   def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
>>>>>   def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
>>>>>   def this(inlets: Array[Inlet[I]], outlets: Array[Outlet[O]]) = 
>>>>> this(inlets.size, outlets.size, FanShape.Ports(inlets.toList, 
>>>>> outlets.toList))
>>>>>   override protected def construct(init: FanShape.Init[_]): FanShape[_] = 
>>>>> new UniformFanShape(n, m, init)
>>>>>   override def deepCopy(): UniformFanShape[I, O] = 
>>>>> super.deepCopy().asInstanceOf[UniformFanShape[I, O]]
>>>>>
>>>>>   val inArray: Array[Inlet[I @uncheckedVariance]] = Array.tabulate(n)(i ⇒ 
>>>>> newInlet[I](s"in$i"))
>>>>>   def in(n: Int): Inlet[I @uncheckedVariance] = inArray(n)
>>>>>
>>>>>   val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(m)(j 
>>>>> ⇒ newOutlet[O](s"out$j"))
>>>>>   def out(m: Int): Outlet[O @uncheckedVariance] = outArray(m)
>>>>> }
>>>>>
>>>>>
>>>>> This code allows creating graph, however, it is not possible to 
>>>>> process messages with it. It doesn't call handlers for messages, they get 
>>>>> stuck somewhere. 
>>>>> Could you please help me to fix it? 
>>>>>
>>>>> PS: I am not an expert in Scala.
>>>>>
>>>>> Thank you in advance!
>>>>>
>>>>> Regards,
>>>>> Sergey
>>>>>
>>>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com <javascript:>.
> To post to this group, send email to akka...@googlegroups.com 
> <javascript:>.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to