[akka-user] Re: Akka Stream, Framing with GraphStage instead of PushPullStage: messages get lost

2016-05-13 Thread Johan Andrén
Hi again,

Yes, the fix was for the new FrameParser and not the old one. 

I cannot see any changes in the working example in your last message, but I 
have opened a ticket and a PR that it would be great if you could have a 
look at and confirm that it is the fix you applied:
https://github.com/akka/akka/pull/20522

--
Johan Andrén
Akka Team, Lightbend Inc.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka Stream, Framing with GraphStage instead of PushPullStage: messages get lost

2016-05-12 Thread Qux
Thanks for your suggestion.

The second framing (which uses absorbTermination() ) is working as intended.
The Problem is in the first, not deprecated framing, but thanks to your 
reply I have now a working solution. So here is the working example, so you 
can fix it in the docs: 

  val framing = BidiFlow.fromGraph(create() { b =>
implicit val order = ByteOrder.LITTLE_ENDIAN

def addLengthHeader(bytes: ByteString) = {
  val len = bytes.length
  ByteString.newBuilder.putInt(len).append(bytes).result()
}

class FrameParser extends GraphStage[FlowShape[ByteString, ByteString]] {
  // this holds the received but not yet parsed bytes
  val in = Inlet[ByteString]("FrameParser.in")
  val out = Outlet[ByteString]("FrameParser.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = new GraphStageLogic(shape) {

// this holds the received but not yet parsed bytes
var stash = ByteString.empty
// this holds the current message length or -1 if at a boundary
var needed = -1

setHandler(out, new OutHandler {
  override def onPull(): Unit = {
if (isClosed(in)) run()
else pull(in)
  }
})
setHandler(in, new InHandler {
  override def onPush(): Unit = {
val bytes = grab(in)
stash = stash ++ bytes
run()
  }

  override def onUpstreamFinish(): Unit = {
if (stash.isEmpty) completeStage()
// wait with completion and let run() complete when the
// rest of the stash has been sent downstream
   // we still have bytes to emit

  }
})

private def run(): Unit = {
  if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) {
  if (isClosed(in)) completeStage()
  else pull(in)
} else {
  needed = stash.iterator.getInt
  stash = stash.drop(4)
  run() // cycle back to possibly already emit the next chunk
}
  } else if (stash.length < needed) {
// we are in the middle of a message, need more bytes,
// or have to stop if input closed
if (isClosed(in)) completeStage()
else pull(in)
  } else {
// we have enough to emit at least one message, so do it
val emit = stash.take(needed)
stash = stash.drop(needed)
needed = -1
push(out, emit)
  }
}
  }
}


val outbound = b.add(Flow[ByteString].map(addLengthHeader))
val inbound = b.add(Flow[ByteString].via(new FrameParser))
BidiShape.fromFlows(outbound, inbound)
  })







Am Donnerstag, 12. Mai 2016 17:48:21 UTC+2 schrieb Johan Andrén:
>
> Hi Qux,
>
> I think it may be a bug in the sample, I think absorbTermination() in 
> onUpstreamFinnish should be replaced with if (isAvailable(out)) run() but 
> now nothing is done, If you can confirm that this solves the problem I'll 
> fix the samples. Thanks.
>
> --
> Johan Andrén
> Akka Team, Lightbend Inc.
>
> On Thursday, May 12, 2016 at 2:25:54 PM UTC+2, Qux wrote:
>>
>>
>>1. Hi,
>>
>> I have a Problem with Framing. I tried to build the ping-pong-example 
>> from http://doc.akka.io/docs/akka/2.4.4/java/stream/stream-graphs.html. 
>> If I run it over TCP, Messages get lost:
>>
>> trait Message
>>> case class Ping(id: Int) extends Message
>>> case class Pong(id: Int) extends Message
>>> def toBytes(msg: Message): ByteString = {
>>>   implicit val order = ByteOrder.LITTLE_ENDIAN
>>>   msg match {
>>> case Ping(id) =>
>>>   ByteString.newBuilder.putByte(1).putInt(id).result()
>>> case Pong(id) =>
>>>   ByteString.newBuilder.putByte(2).putInt(id).result()
>>>   }
>>> }
>>> def fromBytes(bytes: ByteString): Message = {
>>>   implicit val order = ByteOrder.LITTLE_ENDIAN
>>>   val it = bytes.iterator
>>>   it.getByte match {
>>> case 1 => Ping(it.getInt)
>>> case 2 => Pong(it.getInt)
>>> case other => throw new RuntimeException(s"parse error: expected 1|2 
>>> got $other")
>>>   }
>>> }
>>> val codec = BidiFlow.fromFunctions(toBytes _, fromBytes _)
>>> val framing = BidiFlow.fromGraph(create() { b =>
>>>   implicit val order = ByteOrder.LITTLE_ENDIAN
>>>
>>>   def addLengthHeader(bytes: ByteString) = {
>>> val len = bytes.length
>>> ByteString.newBuilder.putInt(len).append(bytes).result()
>>>   }
>>>   class FrameParser extends GraphStage[FlowShape[ByteString, ByteString]] {
>>> // this holds the received but not yet parsed bytes
>>> val in = Inlet[ByteString]("FrameParser.in")
>>> val out = Outlet[ByteString]("FrameParser.out")
>>> override val shape = FlowShape.of(in, out)
>>>
>>> override def createLogic(inheritedAttributes: Attributes): 
>>> 

[akka-user] Re: Akka Stream, Framing with GraphStage instead of PushPullStage: messages get lost

2016-05-12 Thread Johan Andrén
Hi Qux,

I think it may be a bug in the sample, I think absorbTermination() in 
onUpstreamFinnish should be replaced with if (isAvailable(out)) run() but 
now nothing is done, If you can confirm that this solves the problem I'll 
fix the samples. Thanks.

--
Johan Andrén
Akka Team, Lightbend Inc.

On Thursday, May 12, 2016 at 2:25:54 PM UTC+2, Qux wrote:
>
>
>1. Hi,
>
> I have a Problem with Framing. I tried to build the ping-pong-example from 
> http://doc.akka.io/docs/akka/2.4.4/java/stream/stream-graphs.html. If I 
> run it over TCP, Messages get lost:
>
> trait Message
>> case class Ping(id: Int) extends Message
>> case class Pong(id: Int) extends Message
>> def toBytes(msg: Message): ByteString = {
>>   implicit val order = ByteOrder.LITTLE_ENDIAN
>>   msg match {
>> case Ping(id) =>
>>   ByteString.newBuilder.putByte(1).putInt(id).result()
>> case Pong(id) =>
>>   ByteString.newBuilder.putByte(2).putInt(id).result()
>>   }
>> }
>> def fromBytes(bytes: ByteString): Message = {
>>   implicit val order = ByteOrder.LITTLE_ENDIAN
>>   val it = bytes.iterator
>>   it.getByte match {
>> case 1 => Ping(it.getInt)
>> case 2 => Pong(it.getInt)
>> case other => throw new RuntimeException(s"parse error: expected 1|2 got 
>> $other")
>>   }
>> }
>> val codec = BidiFlow.fromFunctions(toBytes _, fromBytes _)
>> val framing = BidiFlow.fromGraph(create() { b =>
>>   implicit val order = ByteOrder.LITTLE_ENDIAN
>>
>>   def addLengthHeader(bytes: ByteString) = {
>> val len = bytes.length
>> ByteString.newBuilder.putInt(len).append(bytes).result()
>>   }
>>   class FrameParser extends GraphStage[FlowShape[ByteString, ByteString]] {
>> // this holds the received but not yet parsed bytes
>> val in = Inlet[ByteString]("FrameParser.in")
>> val out = Outlet[ByteString]("FrameParser.out")
>> override val shape = FlowShape.of(in, out)
>>
>> override def createLogic(inheritedAttributes: Attributes): 
>> GraphStageLogic = new GraphStageLogic(shape) {
>>
>>   // this holds the received but not yet parsed bytes
>>   var stash = ByteString.empty
>>   // this holds the current message length or -1 if at a boundary
>>   var needed = -1
>>
>>   setHandler(out, new OutHandler {
>> override def onPull(): Unit = {
>>   if (isClosed(in)) run()
>>   else pull(in)
>> }
>>   })
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val bytes = grab(in)
>>   stash = stash ++ bytes
>>   run()
>> }
>> override def onUpstreamFinish(): Unit = {
>>   if (stash.isEmpty) completeStage()
>>   // wait with completion and let run() complete when the
>>   // rest of the stash has been sent downstream
>> }
>>   })
>>   private def run(): Unit = {
>> if (needed == -1) {
>>   // are we at a boundary? then figure out next length
>>   if (stash.length < 4) {
>> if (isClosed(in)) completeStage()
>> else pull(in)
>>   } else {
>> needed = stash.iterator.getInt
>> stash = stash.drop(4)
>> run() // cycle back to possibly already emit the next chunk
>>   }
>> } else if (stash.length < needed) {
>>   // we are in the middle of a message, need more bytes,
>>   // or have to stop if input closed
>>   if (isClosed(in)) completeStage()
>>   else pull(in)
>> } else {
>>   // we have enough to emit at least one message, so do it
>>   val emit = stash.take(needed)
>>   stash = stash.drop(needed)
>>   needed = -1
>>   push(out, emit)
>> }
>>   }
>> }
>>   }
>>
>>   val outbound = b.add(Flow[ByteString].map(addLengthHeader))
>>   val inbound = b.add(Flow[ByteString].via(new FrameParser))
>>   BidiShape.fromFlows(outbound, inbound)
>> })
>>
>> val protocol = codec atop framing
>>
>> val server = Tcp().bind("127.0.0.1", 0).to(Sink.foreach {
>>   conn =>
>> conn.flow.join(protocol.reversed).join(Flow[Message].map {
>>   case Ping(id) => Pong(id)
>>   case Pong(id) => Ping(id)
>>   case other => println("error"); throw new RuntimeException(s"other: 
>> $other")
>> }).run()
>> }).run()
>>
>> val myaddr = Await.result(server, 1.second)
>> //
>> val client = Tcp().outgoingConnection(myaddr.localAddress)
>> //
>> val stack = protocol join client
>> Source((0 to 20)).map { x => Ping(x) }.via(stack).runForeach(println)
>
>
>
> If I use following (deprecated )framing instead, all messages get through:
>
>
>> val framing = BidiFlow.fromGraph(create() { b =>
>>   implicit val order = ByteOrder.LITTLE_ENDIAN
>>
>>   def addLengthHeader(bytes: ByteString) = {
>> val len = bytes.length
>> ByteString.newBuilder.putInt(len).append(bytes).result()
>>   }
>>   class FrameParser extends PushPullStage[ByteString, ByteString] {
>> //