Re: [akka-user] Data from tcp to a dynamically balanced stream does not work

2016-09-09 Thread Viktor Klang
Hi Ismail,

Please create a minimized, runnable code example, preferably with a test
that reproduces the situation.


On Fri, Sep 9, 2016 at 11:02 AM, Ismail Kelebek 
wrote:

> I have a problem while forwarding data received from tcp to a stream . It
> works with hardcoded version but does not work with dynamic balance graph .
> If i change the rtcpgraph with rtcpgraphdynamic, data does not flow into
> stream as if its run method is missing . Data arrives at publisher sink's
> materialized value (ret ), but does not continue further into stream. This
> dynamic graph works if i read data from file .
>
> val clsParallelPartialGraph = GraphDSL.create(){
> implicit builder =>
>
>
> val balance : UniformFanOutShape[(Array[Int]),(Array[Int
> ])] = builder.add(Balance[(Array[Int])](ParallelCount ))
>
>
> (0 until clsParallelCount).foreach { x =>
> balance ~> flowClusterings(x) ~>Sink.ignore
> }
>
>
> SinkShape(balance.in)
>  }
>  val connectionRet = connections.to(Sink.foreach { connection =>
>
>
> printOrNot(s"New connection from:
> ${connection.remoteAddress}")
> val sink1 = Sink.asPublisher[ByteString](fanout = true)
> val sourceTemp = Source.maybe[ByteString]
> val flowsinksource = Flow.fromSinkAndSourceMat(sink1,
> sourceTemp)(Keep.left)
>
>
>
>
> val ret : Publisher[ByteString] = connection.handleWith(
> flowsinksource )
>
>
> val sourcetcp = Source.fromPublisher(ret)
> .via(Framing.delimiter(
> ByteString("\n"),
> maximumFrameLength = 256,
> allowTruncation = true))
> .via(Flow[ByteString].map{x =>
> val decoded = x.decodeString("UTF-8")
> println("decoded :" + decoded)
> decoded
> })
>
>
> val rgraphtcp = RunnableGraph.fromGraph(GraphDSL.create(
> /*sinkToJsTuple*/){ implicit builder =>
> val balance : UniformFanOutShape[(Array[Int]),(Array[
> Int] )] = builder.add(Balance[(Array[Int])](ParallelCount ))
>
>
>
>
> sourcetcp ~>flow1tcp~> flow2 ~> flowMain ~>
> balance ~> flowClustering0 ~>Sink.ignore
> balance ~> flowClustering1 ~>Sink.ignore
> balance ~> flowClustering2 ~>Sink.ignore
>
>
> ClosedShape
> })
>
>
> val mattcp = rgraphtcp.run()
> /*
> val rgraphtcpdynamic = 
> RunnableGraph.fromGraph(GraphDSL.create(){
> implicit builder =>
> val balance : UniformFanOutShape[(Array[Int]),(Array[Int]
> )] = builder.add(Balance[(Array[Int] )](ParallelCount ))
>
>
> sourcetcp ~>flow1tcp~> flow2 ~> flowMain ~>
> clsParallelPartialGraph
>
>
> ClosedShape
> })
>
>
> val mattcp = rgraphtcpdynamic.run()
> */
>
>
> }).run
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Data from tcp to a dynamically balanced stream does not work

2016-09-09 Thread Ismail Kelebek
I have a problem while forwarding data received from tcp to a stream . It 
works with hardcoded version but does not work with dynamic balance graph . 
If i change the rtcpgraph with rtcpgraphdynamic, data does not flow into 
stream as if its run method is missing . Data arrives at publisher sink's 
materialized value (ret ), but does not continue further into stream. This 
dynamic graph works if i read data from file .

val clsParallelPartialGraph = GraphDSL.create(){
implicit builder =>


val balance : UniformFanOutShape[(Array[Int]),(Array[Int])] 
= builder.add(Balance[(Array[Int])](ParallelCount ))


(0 until clsParallelCount).foreach { x =>
balance ~> flowClusterings(x) ~>Sink.ignore
}


SinkShape(balance.in)
 }
 val connectionRet = connections.to(Sink.foreach { connection =>


printOrNot(s"New connection from: 
${connection.remoteAddress}")
val sink1 = Sink.asPublisher[ByteString](fanout = true)
val sourceTemp = Source.maybe[ByteString]
val flowsinksource = Flow.fromSinkAndSourceMat(sink1, 
sourceTemp)(Keep.left)




val ret : Publisher[ByteString] = 
connection.handleWith(flowsinksource 
)


val sourcetcp = Source.fromPublisher(ret)
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.via(Flow[ByteString].map{x =>
val decoded = x.decodeString("UTF-8")
println("decoded :" + decoded) 
decoded
})


val rgraphtcp = RunnableGraph.fromGraph(GraphDSL.create(
/*sinkToJsTuple*/){ implicit builder =>
val balance : UniformFanOutShape[(Array[Int]),(Array[Int
] )] = builder.add(Balance[(Array[Int])](ParallelCount ))




sourcetcp ~>flow1tcp~> flow2 ~> flowMain ~>   
balance ~> flowClustering0 ~>Sink.ignore
balance ~> flowClustering1 ~>Sink.ignore
balance ~> flowClustering2 ~>Sink.ignore


ClosedShape
})


val mattcp = rgraphtcp.run()
/*
val rgraphtcpdynamic = 
RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder =>
val balance : 
UniformFanOutShape[(Array[Int]),(Array[Int] )] = 
builder.add(Balance[(Array[Int] )](ParallelCount ))


sourcetcp ~>flow1tcp~> flow2 ~> flowMain ~> 
clsParallelPartialGraph


ClosedShape
})


val mattcp = rgraphtcpdynamic.run()
*/


}).run

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.