I have a problem while forwarding data received from tcp to a stream . It 
works with hardcoded version but does not work with dynamic balance graph . 
If i change the rtcpgraph with rtcpgraphdynamic, data does not flow into 
stream as if its run method is missing . Data arrives at publisher sink's 
materialized value (ret ), but does not continue further into stream. This 
dynamic graph works if i read data from file .

val clsParallelPartialGraph = GraphDSL.create(){
            implicit builder =>


                val balance : UniformFanOutShape[(Array[Int]),(Array[Int])] 
= builder.add(Balance[(Array[Int])](ParallelCount ))


                (0 until clsParallelCount).foreach { x =>
                    balance ~> flowClusterings(x) ~>Sink.ignore
                }


                SinkShape(balance.in)
 }
 val connectionRet = connections.to(Sink.foreach { connection =>


                printOrNot(s"New connection from: 
${connection.remoteAddress}")
                val sink1 = Sink.asPublisher[ByteString](fanout = true)
                val sourceTemp = Source.maybe[ByteString]
                val flowsinksource = Flow.fromSinkAndSourceMat(sink1, 
sourceTemp)(Keep.left)




                val ret : Publisher[ByteString] = 
connection.handleWith(flowsinksource 
)


                val sourcetcp = Source.fromPublisher(ret)
                    .via(Framing.delimiter(
                    ByteString("\n"),
                    maximumFrameLength = 256,
                    allowTruncation = true))
                    .via(Flow[ByteString].map{x =>
                    val decoded = x.decodeString("UTF-8")
                    println("decoded :" + decoded) 
                    decoded
                })


                val rgraphtcp = RunnableGraph.fromGraph(GraphDSL.create(
/*sinkToJsTuple*/){ implicit builder =>
                    val balance : UniformFanOutShape[(Array[Int]),(Array[Int
] )] = builder.add(Balance[(Array[Int])](ParallelCount ))




                    sourcetcp ~>    flow1tcp    ~> flow2 ~> flowMain ~>   
balance ~> flowClustering0 ~>Sink.ignore
                    balance ~> flowClustering1 ~>Sink.ignore
                    balance ~> flowClustering2 ~>Sink.ignore


                    ClosedShape
                })


                val mattcp = rgraphtcp.run()
/*
                val rgraphtcpdynamic = 
RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder =>
                    val balance : 
UniformFanOutShape[(Array[Int]),(Array[Int] )] = 
builder.add(Balance[(Array[Int] )](ParallelCount ))


                    sourcetcp ~>    flow1tcp    ~> flow2 ~> flowMain ~> 
clsParallelPartialGraph


                    ClosedShape
                })


                val mattcp = rgraphtcpdynamic.run()
*/


            }).run

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to